全国总工会第二届职工数字化应用技术技能大赛 部分WP

全国总工会第二届职工数字化应用技术技能大赛部分WP

数据安全技能赛

算法恢复

题目描述

某加密产品采用标准国密算法进行数据安全防护,但某安全研究员逆向出代码,发现采用特殊代码逻辑进行数据处理,这是一种自同步流密码,请分析提供代码文件,基于截获的密文及侧信道数据,恢复出原始数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import os
from secret import *
from gmssl import sm4, func
from gmssl.sm4 import CryptSM4, SM4_ENCRYPT
from Crypto.Util.number import *

assert(len(flag) == 38)
assert flag[:5] == b'flag{' and flag[-1:] == b'}'
assert(len(key) == 16)

def padding(msg):
tmp = 16 - len(msg) % 16
pad = format(tmp, '02x')
return bytes.fromhex(pad * tmp) + msg

message = padding(flag)
cnt = len(message) // 16
messages = [message[16*i:16*(i+1)] for i in range(cnt)]
hint = bytes_to_long(key) ^ bytes_to_long(message[:16])

IV = os.urandom(16)
crypt_sm4 = CryptSM4()
crypt_sm4.set_key(key, SM4_ENCRYPT)
prev_ci = IV
output = b''

for msg in messages:
enc_output = func.list_to_bytes(crypt_sm4.one_round(crypt_sm4.sk, prev_ci))
ci = bytes([msg[j] ^ enc_output[j] for j in range(16)])
prev_ci = ci
output = output + ci

print('enc =', output.hex())
print('hint =', hex(hint)[2:])

# enc = da1db10716b19792c4957557e5bb8c4586d17a21281fae1b3b14a6cb80a4d084b3804d4d95cebffe5e4bb1dca85ae726
# hint = 3c3b4f3b3d3f683e3a49545c53013e77

解题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from gmssl.sm4 import CryptSM4, SM4_ENCRYPT
from gmssl import func

# 题目给出的密文和 hint(均为 hex)
enc_hex = "da1db10716b19792c4957557e5bb8c4586d17a21281fae1b3b14a6cb80a4d084b3804d4d95cebffe5e4bb1dca85ae726"
hint_hex = "3c3b4f3b3d3f683e3a49545c53013e77"

enc = bytes.fromhex(enc_hex)
hint = bytes.fromhex(hint_hex)

# ============ 已知信息推断 ==============
# padding(flag) 的填充方式为:在开头补 N 个字节,值为填充长度 pad_len
# flag 长度为 38 → 38 % 16 = 6 → 需要填充 10 个字节
pad_len = 10

# message 的第一个 16 字节块 m0 = 10*0x0a + b"flag{" + 1 个未知字节
# 我们已知前 15 字节,仅需枚举最后 1 字节(共 256 种)
m0_prefix = bytes([pad_len]) * pad_len + b"flag{"

# 划分密文为 16 字节块
blocks = [enc[i:i+16] for i in range(0, len(enc), 16)]

# 工具函数:按字节异或
def xor_bytes(a, b):
return bytes(x ^ y for x, y in zip(a, b))

# 判断 flag 中间部分是否全部可打印 ASCII(CTF flag 一般如此)
def is_printable(bs):
return all(0x20 <= b <= 0x7e for b in bs)

# ============ 暴力枚举 m0 的最后一个字节 ==============
for g in range(256):
# 构造猜测的 m0
m0 = m0_prefix + bytes([g])

# hint = key XOR m0 → key = hint XOR m0
key = xor_bytes(hint, m0)

# 使用猜测出的 key 创建 SM4 加密器
crypt = CryptSM4()
crypt.set_key(key, SM4_ENCRYPT)

# 根据加密过程逆推出所有明文块
# 加密方式:ci = mi XOR E_k(prev_ci)
msg_blocks = [m0]

for i in range(1, len(blocks)):
prev = blocks[i-1]
# 计算 E_k(prev_ci)
ek = func.list_to_bytes(crypt.one_round(crypt.sk, prev))
# 反推出 mi
msg_blocks.append(xor_bytes(blocks[i], ek))

# 拼接完整 message
msg = b"".join(msg_blocks)

# 根据 padding 格式移除前置填充
p = msg[0]
if p > 16:
continue
flag = msg[p:p+38]

# 检查 flag 格式
if len(flag) == 38 and flag.startswith(b"flag{") and flag.endswith(b"}"):
# 检查中间内容全部是可打印 ASCII
if is_printable(flag[5:-1]):
print("key =", key.hex())
print("flag =", flag.decode())
break
#key = 36314531373562343043323032664536
#flag = flag{ADMIN@d35.com_520508194902259021}

lmr_02

题目描述

解密medical_data.json获得明文的ssn号码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import scrypt
import base64
import json

class MedicalEncryptor:
def __init__(self, password: str):

self.salt = b'fixed_salt_12345678'

self.key = scrypt(password, self.salt, 16, N=1024, r=8, p=1)

def encrypt_record(self, patient_data: dict) -> dict:

patient_num = patient_data["patient_id"].replace("PT", "")
nonce = f'medical_nonce_{patient_num}'.encode().ljust(16, b'\x00')

cipher = AES.new(self.key, AES.MODE_GCM, nonce=nonce)

associated_data = json.dumps({
"patient_id": patient_data["patient_id"],
"record_type": "medical"
}).encode()
cipher.update(associated_data)

# 加密敏感数据
sensitive_data = {
'diagnosis': patient_data['diagnosis'],
'ssn': patient_data['ssn'], # 包含SSN信息
'insurance': patient_data['insurance']
}

plaintext = json.dumps(sensitive_data).encode()
ciphertext, tag = cipher.encrypt_and_digest(plaintext)

return {
'salt': base64.b64encode(self.salt).decode(),
'nonce': base64.b64encode(nonce).decode(),
'tag': base64.b64encode(tag).decode(),
'encrypted_data': base64.b64encode(ciphertext).decode()
}

# 使用示例
if __name__ == "__main__":
# 示例使用弱密码加密
encryptor = MedicalEncryptor("hospital123")
patient_data = {
"patient_id": "PT2024001",
"diagnosis": "示例诊断",
"ssn": "123-45-6789",
"insurance": "INS123456789"
}
encrypted = encryptor.encrypt_record(patient_data)
print(encrypted)
print("加密完成")

解题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import base64
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import scrypt

# ==== 读取加密数据 ====
with open("medical_data.json", "r", encoding="utf-8") as f:
medical_encrypted = json.load(f)

# ==== 读取密码列表 ====
with open("pwd.txt", "r", encoding="utf-8") as f:
passwords = [line.strip() for line in f]

# ==== 辅助函数 ====
def decrypt_record(record, password):
salt = base64.b64decode(record['salt'])
nonce = base64.b64decode(record['nonce']).ljust(16, b'\x00')
tag = base64.b64decode(record['tag'])
ciphertext = base64.b64decode(record['encrypted_data'])
aad = base64.b64decode(record['associated_data'])

key = scrypt(password.encode(), salt, 16, N=1024, r=8, p=1)
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
cipher.update(aad)
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
return json.loads(plaintext.decode('utf-8'))

for pw in passwords:
for record in medical_encrypted['encrypted_records']:
pid = record.get('patient_id', '<unknown>')
try:
pt = decrypt_record(record, pw)
print(f"--- Decrypted patient {pid} ---")
print(pt)
except Exception:
continue

数据风险排查

背景描述

近期,在集团例行数据安全检查中发现多起异常行为:内部主机通过 SSH 传输了疑似部署私钥文件,并将包含用户信息的csv表格外发;同时邮件系统审计显示存在未经授权的root账号与明文密码传输情况。经初步判断,本次事件涉及密钥泄露与大量用户敏感信息外泄,已列为高优先级安全事件,需根据附件中的邮件文件和流量信息分析下面的问题。

题干说明

【题目1】数据泄露

经对附件中抓取的网络流量分析发现存在疑似私钥文件被传输。请分析流量包并确认传输私钥文件的客户端IP 地址,并计算将该 IP 字符串的MD5,结果需采用 32 位小写十六进制格式;最终将该 MD5 作为答案提交。

【答案标准】

若 IP 地址为 1.1.1.1 ,计算MD5后得到的32位小写字符串: e086aa137fa19f67d27b39d0eca18610 ,将此md5字符串作为答案提交。

分组字节流查找关键字符串 PRIVATE KEY 得到私钥传输记录 ip为 10.0.1.50

【题目2】数据传输

经对相关邮件进行详尽分析,已确认存在主机用户凭证(用户名与明文密码)通过邮件被泄露的情况。请核实该

泄露邮件的发送者邮箱地址,并计算该邮箱地址的MD5,计算结果以 32 位小写十六进制形式表示,最终将该

MD5 值作为答案提交。

【答案标准】

若邮件发信地址为 addr@126.com ,则计算MD5后得到: b278947ee43ee383150a65c85c8748ff 。

编写脚本解码邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import base64
import os

# 文件夹路径
folder = r"mails"

# 遍历文件夹中的所有文件
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)

with open(file_path, "r", encoding="utf-8") as f:
content = f.read().split("\n\n")

# 获取邮件正文(假设在第二部分)
if len(content) > 1:
body = content[1].replace("\n", "")

try:
decoded = base64.b64decode(body).decode()
if "root" in decoded:
print(content[0]) # 打印邮件头
print(decoded)
except Exception as e:
print(f"文件 {filename} 解码失败: {e}")

matthew44@temp-mail.org 发送的邮件中存在明文用户名和密码

【题目3】数据溯源

经将附件中的流量包与题目二的分析结果核对后,需进一步确认本次事件的责任人。流量中传输的用户表已明确

显示每条记录的用户ID与对应邮箱地址,依据比对结果确认责任用户后,请将该用户的 ID 进行 MD5 计算,并以

32 位小写十六进制字符串形式提交计算结果作为最终答案。

同样查找关键字

或者使用strings 命令过滤

签名疑云

背景描述

某企业日常会产生大量敏感业务文档。为保障文件完整性与责任可追溯,公司明确管理规范:员工创建 / 修改的文件需添加个人数字签名,并更新文档元数据中的作者信息。近期,信息安全部门在例行检查中,发现文件数字签名相关环节存在部分异常情况,需通过后续分析进一步排查。请你作为企业的数据安全工程师来完成这份工作。

相关信息

附件信息:

document目录:存放了经过数字签名的文档

certificates目录:存放证书文件,包括企业根证书和用于验证签名的证书

signature_records.csv文件:记录每个文件与签名者的关系

题干说明

【题目1】数据溯源

某些员工在签名时没有使用自己的证书签名,请使用分析签名记录和文件元数据中的作者字段,找到作者与数签名不同的文件,将这些文件的文件编号按从小到大的顺序,使用英文半角逗号连接,转为32位小写md5提交。

【答案标准】

本题答案为32位小写md5值。举例说明,若找到的没有使用作者证书签名的PDF文件有:12.pdf,13.pdf,1.pdf,拼接后的字符串应为:1,12,13,最终提交的md5值为:22774c7c7335cb7c2eed3622537c4a88

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import os
import csv
import hashlib
from PyPDF2 import PdfReader

# 路径配置
document_dir = "documents"
signature_csv = "signature_records.csv"

# 读取签名记录
signature_dict = {} # filename -> signer
with open(signature_csv, newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
filename = row['文件名'].strip()
signer = row['签名者'].strip()
signature_dict[filename] = signer
# 检查作者与签名是否一致
mismatch_files = []

for filename in os.listdir(document_dir):
file_path = os.path.join(document_dir, filename)
reader = PdfReader(file_path)
author = reader.metadata.get('/Author', '').strip()
signer = signature_dict[filename]
if author != signer:
file_number = int(filename.split('.')[0])
mismatch_files.append(file_number)

# 排序并生成字符串
mismatch_files.sort()
concat_str = ",".join(str(num) for num in mismatch_files)

# 计算 MD5
md5_hash = hashlib.md5(concat_str.encode('utf-8')).hexdigest()
print("拼接字符串:", concat_str)
print("MD5:", md5_hash)
#拼接字符串: 9,29,44,46,57,62,83,87,94
#MD5: 163c1ac47f295f2202739c8646b6c07b

【题目2】数据伪造

在审查中发现,某些员工用于签名的证书并不是由公司的根证书颁发,这意味着文件可能是伪造的,请使用公司根证书对员工证书进行校验,找出无法通过校验的员工证书,按照证书序列号按数值从小到大进行排序(注:序列号为16进制,字母请使用小写字母,中间不要使用空格等其他字符分割),并使用英文半角逗号连接,转为32位小写md5提交。

【答案标准】

本题答案为32位小写md5值,举例说明,若找到的序列号为:

1: d2e84f1a67c3b9528d74a6e039bcf7852e9461d8

2: 4a7b2c9de1f83642a95e73b4c618df298a6307b5

3: 96c13af4b728e5d942865ca3f01b79cd58e294a7

拼接后字符串为:

4a7b2c9de1f83642a95e73b4c618df298a6307b5,96c13af4b728e5d942865ca3f01b79cd58e294a7,d2e84f1a67c3b9528d74a6e039bcf7852e9461d8

最终提交的md5值为:0f75590f8dcd78f01ff1c1e20ebe7e47

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import hashlib
import os
from cryptography import x509
from cryptography.hazmat.backends import default_backend

# 读取根证书 SKI
with open("certificates/rootca.crt", "rb") as f:
root_cert = x509.load_pem_x509_certificate(f.read(), default_backend())
root_ski = root_cert.extensions.get_extension_for_oid(
x509.ExtensionOID.SUBJECT_KEY_IDENTIFIER
).value.public_bytes().hex()[-40:]
print(root_ski)

# 遍历员工证书,找出 AKI 不匹配的序列号
err_serials = []
employees_cert_dir = "certificates/employees"
for filename in os.listdir(employees_cert_dir):
path = os.path.join(employees_cert_dir, filename)
with open(path, "rb") as f:
cert = x509.load_pem_x509_certificate(f.read(), default_backend())
aki = cert.extensions.get_extension_for_oid(
x509.ExtensionOID.AUTHORITY_KEY_IDENTIFIER
).value.public_bytes().hex()[-40:]
if aki != root_ski:
err_serials.append(format(cert.serial_number, "x"))

# 排序并计算 MD5
err_serials.sort()
concat_str = ",".join(err_serials)
md5_hash = hashlib.md5(concat_str.encode("utf-8")).hexdigest()

print("拼接字符串:", concat_str)
print("MD5:", md5_hash)
#拼接字符串: 602a3c4cd431289ad4a272b66ca863168a4b047c,7fc39638f5d5f3454a94a70200002d2cb8fc7ef2
#MD5: 7ba75b08178eb7be58f5d3d309d27852

【题目3】数据篡改

近期发现若干 PDF 在校验前置过程中暴露出结构异常,使得我们所依托的签章脚本尚未进入深度验证便被迫终止;请整理出所有因前置结构校验异常而被流程中止的文档编号。将这些编号按从小到大的顺序,以英文半角逗号连接,并转换为32位小写 md5 后提交。

【答案标准】

本题答案为32位小写md5值。举例说明,若因结构级异常而在前置环节被中止的 PDF 文件编号为:12.pdf,13.pdf,1.pdf,则拼接后的字符串为:1,12,13,最终提交的 md5 值为:22774c7c7335cb7c2eed3622537c4a88

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import hashlib
import os

from pyhanko.pdf_utils.reader import PdfFileReader
from pyhanko.sign.validation import validate_pdf_signature

base_dir = "documents"
err = []
for f in os.listdir(base_dir):
with open(os.path.join(base_dir, f), 'rb') as doc:
try:
r = PdfFileReader(doc)
sig = r.embedded_signatures[0]
status = validate_pdf_signature(sig)
status.pretty_print_details()
except:
err.append(int(f.split('.')[0]))

err.sort()
err_txt = ",".join(str(num) for num in err)
# 计算 MD5
md5_hash = hashlib.md5(err_txt.encode('utf-8')).hexdigest()
print("拼接字符串:", err_txt)
print("MD5:", md5_hash)
#拼接字符串: 5,30,33,61,71
#MD5: 4bed00738b61fd25b984acf14dea449e

模型偏见修复

场景描述

某电商平台的情感分析模型因训练数据中“价格高”常与差评共现,错误地将高价商品评论一概判为负面,忽视了用户对“物有所值”的积极评价。例如,“虽然贵但超值!”这类真实正向反馈被误判为差评,影响优质商品曝光。为校正这一系统性偏见,需在不改变语义的前提下,对100条高价正向评论施加不超过3个字符的微小扰动,引导模型关注真实情感而非价格标签,从而提升模型的公平性与准确性。

题干信息

选手需基于提供的 sample_reviews.csv 文件中的 100 条高价正向评论(如“虽然贵但超值!”),为每条评论设计一个不超过3个字符的修复文本(例如:👍),并填写至 fix_sample.csv 的对应 text 字段中。访问在线环境,将 fix_sample.csv 上传并进行预测,平台会将该修复文本自动拼接至原始评论末尾,形成新输入(例如:“虽然贵但超值!👍”)如果正向样本大于80,就可以将 fix_sample.csv 作为答案提交到答题平台上。平台会根据正向样本数进行打分。

提供信息

【平台附件】

fix_sample.csv(修复字符样本模板文件,包含id和text两列,id与 sample_reviews.csv 的id对应,text字段不得超过3个字符)sample_reviews.csv(100条高价正向评论样本数据,包含id和review两列)

答案提交

提交仅包含2列内容的fix_sample.csv文件,总数必须为100行,第一行列名不计入。第一列为id(需要与sample_reviews.csv中的id对应,严格按照顺序,否则影响成绩)

第二列为text(修复字符内容,不得超过3个字符,用于增强正向情感表达)。

注意:提交的CSV文件务必选择UTF-8编码保存,否则会出现解析失败。

评分标准

最终得分 = (预测的正向样本 × 0.1) × 当前题目总分。(正向样本数小于80不得分)

注意事项

最终答案文件名必须为fix_sample.csv,csv文件编码保存为UTF-8。

每位选手提交平台的次数为10次,提交格式失败会返回错误原因,不得分,扣除次数,请注意符合格式规范。

在提交答案期间,平台将在最多10次成功提交中选取最高分作为最终成绩。

偏见修复解题:

通过正向词拼接测试,发现拼接”不错”时偏见均修复

1
2
3
4
5
6
7
8
9
import pandas as pd

x = ['不错',]
out = []
for i in range(1, 101):
out.append([i, x[0]])
print(out)
df = pd.DataFrame(out, columns=['id', 'text'])
df.to_csv("fix_sample.csv", index=False, encoding='utf-8')

视频推荐投毒

场景描述

视频平台为了提升用户粘性和观看时长,上线了一套视频推荐系统。该系统基于用户的历史观看和评分行为,采用协同过滤算法为用户推荐他们可能感兴趣的新视频。一家竞争对手”快视科技”希望进行恶意竞争,通过污染视频平台的推荐系统,让系统向观看过热门大片(如环太平洋》)的潜在用户错误地推荐低质量的、制作粗糙的《环大西洋》电影,从而降低用户体验和平台的广告收入。

题干信息

你将扮演”快视科技”的攻击者。你将获得平台现有的部分良性用户观看记录。你的任务是通过注册并伪造不超过500条虚假用户的观看历史,构造投毒数据并注入系统。重新训练后,你的目标是让推荐模型在给定的”目标用户列表”(这些用户大部分曾观看过《环太平洋》)中,将”《环大西洋》”推荐的排名尽可能提升,进入Top-5推荐列表。

提供信息

【平台附件】

existing_rentals.csv :包含 user_id , item_id , rating 的原始良性观看记录。

item_info.csv :视频ID与视频名称的对应表(如 item_101 : “《环太平洋》”, item_205 : “《环大西洋》”)。

target_users.csv :需要被攻击的 user_id 列表(5个目标用户)。

submit.csv :用于提交答案的模板,格式与 existing_rentals.csv 相同,但包含你构造的虚假用户和他们的观看记录。

【解题提示】

系统会找到与你品味最相似的一群用户(称为”邻居”),然后把这些”邻居”们都喜欢、但你还没接触过的视频推荐给你。相似度的判断,完全依赖于你们对相同视频的历史评分记录。答案提交提交 submit.csv 文件到平台。

评分标准

平台会用你提交的投毒数据 submit.csv 与原始数据合并,重新训练推荐模型。评分将基于在target_users.csv 中,有多少用户的Top-5推荐列表里成功出现了”《环大西洋》”。

最终得分 = 成功渗透的用户个数 ×(题目总分 × 0.2)

例如:

成功渗透了4个用户,当前题目总分:100分

最终得分 = 4 × (100 × 0.2)= 80分

注意事项

最终答案文件名必须为submit.csv,csv文件编码保存为UTF-8。

每位选手提交平台的次数为10次,提交格式失败会返回错误原因,不得分,扣除次数,请注意符合格式规范。

在提交答案期间,平台将在最多10次成功提交中选取最高分作为最终成绩。

推荐投毒解题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import random

import pandas as pd

u = pd.read_csv(r"target_users.csv")
i = pd.read_csv(r"item_info.csv")

user = []
item = []
for index, row in u.iterrows():
user.append(row['user_id'])

for index, row in i.iterrows():
item.append(row['item_id'])
print(user)
print(item)

out = []
for uu in user:
for ii in item:
for x in range(9):
if ii == 'item_205':
print(uu, ii, 5)
out.append([uu, ii, 5])
else:
print(uu, ii, random.randint(1, 4))
out.append([uu, ii, random.randint(1, 4)])

print(len(out))
df = pd.DataFrame(out, columns=['user_id', 'item_id', 'rating'])
df.to_csv("submit.csv", index=False, encoding='utf-8')

设置 item_205 为最高分,其他随机低分,尝试获得渗透最高用户数 (笨办法)

数据安全综合场景赛

场景说明

强盛集团作为一家大型物业公司,致力于为客户提供高质量的物业管理服务。在一次新物业入住的过程中,公司组织了数据安全风险检查,以确保客户和业主的信息得到妥善保护。检查的重点包括业主个人信息、物业管理数据、以及支付和账单记录等敏感信息的存储和传输方式。为了应对潜在的安全威胁,强盛集团对数据库进行了全面审计,并测试了系统的加密机制、访问权限设置和漏洞防护措施。此外,检查还涵盖了员工数据访问行为的监控,确保只有授权人员才能接触到敏感数据。通过这次风险检查,强盛集团进一步提升了数据安全管理能力,增强了业主对物业管理平台的信任,确保了新物业入住过程中数据安全的合规性。

场景拓扑

场景题干

场景一:采集安全

题目名称:越权采集

物业公司开发了一款新的移动应用(APP),但是该应用因涉嫌越权采集用户信息被应用商城下架。现有一个数据库记录了该APP采集的用户信息。请根据该数据库中的数据,分析并找出该应用越权采集了哪些用户信息,并提交涉及越权采集的数据ID作为答案。数据库名称:app、数据库用户:root、数据库密码:1234567。

【答案标准】若越权采集data_id为4,98,32,400,则最终提交答案为:4-32-98-400。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import json

import pandas as pd

qx = pd.read_csv("app_user.csv")

qxt = {}

for index, row in qx.iterrows():
qxt[row['user_id']] = json.loads(row['permissions'])

df = pd.read_csv("app_user_data.csv")

a = []
for index, row in df.iterrows():
if row['user_id'] in qxt and row['data_type'] in qxt[row['user_id']]:
pass
else:
print(row['data_id'], end="-")
# 34-201-287-452

场景二:数据传输

题目名称:证书安全

智能停车场管理平台使用了数字证书加密了https流量,请提交数字证书颁发者。

【答案标准】例:若颁发者为张三,则最终提交答案为:张三。

浏览器直接查看证书

题目名称:防泄漏排查

智能停车场管理平台存在数据库泄漏漏洞。请查找该漏洞的根本原因,并从数据库中提取激活证书作为答案提交。

【答案标准】例:若数据库中激活证书为ddc80d31ca1d43b7b97d7f6bd6e6b85c,则最终提交答案为:ddc80d31ca1d43b7b97d7f6bd6e6b85c。

系统存在sql执行直接写sql查表

场景三:数据存储

题目名称:脱敏安全

强盛物业公司开发了一款业主交流平台,平台采用现代密码学中的RSA算法对业主数据进行脱敏保护。请根据提供的数据库和Web系统功能,分析该脱敏方法的安全性。如果存在安全风险,请提取admin用户的明文身份证号作为答案提交。数据库名称:flask_rsa_demo、数据库用户:root、数据库密码:1234567。

【答案标准】例:若admin的身份证号为:320888123456789011,则最终提交答案为:320888123456789011。

pass

题目名称:版本识别

系统更迭通常伴随着系统优化和BUG修复,请提交强盛物业当前使用的档案管理系统版本号。【答案标准】例:若系统版本号为V8.1.2.0,则最终提交答案为:8.1.2.0。

主页直接查看

题目名称:合规审查

强盛物业档案管理系统存储了一批业主身份证照片。根据公司规定,存储此类数据必须进行脱敏处理。请统计系统中有多少张照片已符合合规脱敏要求。合规脱敏说明: 姓名、出生年月、住址和身份证号四项信息均已进行遮掩脱敏。账号:admin、密码:admin。

【答案标准】例:若合规脱敏照片数量为100,则最终提交答案为:100。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
from collections import Counter

import numpy as np
from PIL import Image


def get_region_average_color(image_path, box):
with Image.open(image_path) as img:
img = img.convert('RGB')
region = img.crop(box)
arr = np.array(region, dtype=np.float32)
return arr.mean(axis=(0, 1)).mean()


img_dir = "身份证脱敏附件"

datas = {}
for i in os.listdir(img_dir):
datas[i] = 0
box = [(79, 49, 119, 66), (84, 127, 116, 139), (85, 164, 253, 177), (222, 256, 334, 271)]
image_path = os.path.join(img_dir, i)
for b in box:
avg_color = (get_region_average_color(image_path, b))
if avg_color < 1:
datas[i] = datas[i] + 1

count = Counter(list(datas.values()))
print("合规脱敏照片", count[4])
# 合规脱敏照片 112

题目名称:脱敏分级

统计过合规脱敏数量后,需要对存在风险脱敏照片进行分级,严重风险(全未脱敏)、高风险(仅脱敏一项)、中风险(仅脱敏两项)、低风险(仅脱敏三项)。

【答案标准】例:若严重风险10个、高风险8个、中风险20个、低风险40个,则最终提交答案为:10-8-20-40。

同上

1
2
print("风险分级", f"{count[0]}-{count[1]}-{count[2]}-{count[3]}")
#风险分级 94-92-78-74

场景四:数据治理

题目名称:数据清洗

在智能停车场管理平台的财务审核过程中,发现部分车辆收费金额可能异常,请根据平台后台停车记录,按照停车场收费规则重新计算费用(不足30分钟免费,超过30分钟(含)开始计费,每30分钟1元,不足30分钟按30分钟计费,单次封顶200元),找出收费不准确的车牌号,并按车牌省份的拼音首字母从a-z排序。

【答案标准】例:若存在异常的车牌号为:京X0CRPB、粤DB3LPP、京BXGKFE,则最终提交答案为:京BXGKFE-京X0CRPB-粤DB3LPP。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from dateutil import parser
import requests

headers = {
"host": "192.168.70.101:8080",
"cookie": "JSESSIONID=483a67bd-d30d-4812-beda-566808968c76; oFLm_2132_saltkey=fG2PzGA2; oFLm_2132_lastvisit=1762325807; PHPSESSID=av60vi0ujr96i8j8qja8kk8jvu; session=eyJ1c2VyX2lkIjoyfQ.aQsJNg.AWQc80KApt-1aa2y72W5-L8_hig; oFLm_2132_sid=Zx2hvE; oFLm_2132_ulastactivity=be5fWwd2Dn75yM_-Xfy6TEbvm2ANBmd3YwKe06vjAXbtzL0zD9WQ; oFLm_2132_auth=ed8e135YOi2Bk61D7d1nCVGYZ7zptn68ta4jRb86E6Ic6CuxXhASi3Lk-LrSxcnMj11D4-ZehnCaX-y5YfLv; oFLm_2132_logintime=1762331088; oFLm_2132_lip=192.168.2.230%2C1762269573; oFLm_2132_checkauthlic=1; oFLm_2132_checkupgrade=1; oFLm_2132_lastact=1762332018%09index.php%09banner"
}
data = {"pageSize": 2000,
"pageNo": 1
}
r = requests.post("https://192.168.70.101:8080/car/parkingRecord/list", data=data, verify=False, headers=headers)

'''

不足30分钟免费,
超过30分钟(含)开始计费,
每30分钟1元,
不足30分钟按30分
钟计费,单次封顶200元

'''


def calculate_fare(minutes):
if minutes < 30:
return 0
else:
if minutes % 30 == 0:
return min(minutes // 30, 200)
else:
return min(minutes // 30 + 1, 200)

xx= []
x = 0
for i in r.json()['msg']['pageData']:
gmtInto = parser.parse(i['gmtInto'])
gmtOut = parser.parse(i['gmtOut'])
c = int(((gmtOut - gmtInto).total_seconds()) / 60)
cx = calculate_fare(c)

cost = i['cost']
plan = i['plateNumber']
if cx != int(float(cost)):
x += 1
# print(gmtInto, gmtOut, cx, cost)
print(plan)
xx.append(plan)
print(xx)
'沪IM2JHT-京ZSXANF-苏UH30RR-浙I72RNW'

题目名称:数据脱敏

公司在对外提供智能停车场管理平台订单数据时,为了避免敏感信息泄露,系统会对订单中的关键字段进行脱敏处理。审查人员只需在不暴露具体细节的情况下完成业务审核,请导出后台订单管理-订单列表xls表,并按照下面规则进行脱敏和提交。

脱敏规则

订单号仅保留前三位和后三位,中间全部使用 * 替换。

示例: 202508290001 → 202******001

车牌号保留前半部分,后三位使用 * 替换。

示例: 粤B12345 → 粤B12***

订单金额一位数金额:全部脱敏8 → *

两位数金额:保留首位,末位脱敏58 → 5*

三位及以上:仅保留首位和末位,中间全部使用 * 替换1234 → 1**4

提交方式**

将脱敏后的数据导出为csv格式,修改编码为UTF-8,换行为:CRLF后加密成md5提交。

【答案标准】例:若脱敏后的csv文件加密成md5为:ddc80d31ca1d43b7b97d7f6bd6e6b85c,则最终提交答案

为:ddc80d31ca1d43b7b97d7f6bd6e6b85c。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import pandas as pd

df = pd.read_csv("订单信息-20251105083448.csv", encoding="utf-8",dtype=str)

x = []
for index, row in df.iterrows():
id = row['ID']
num = str(row[1])
num = num[:3] + "*" * 6 + num[-3:]
card = row[2][:4] + "*" * 3
cost = str(int(float(row[3])))
nc = ''
if len(cost) == 1:
nc = '*'
elif len(cost) == 2:
nc = cost[0] + "*"
elif len(cost) >= 3:
nc = cost[0] + "*"*(len(cost)-2) + cost[-1]
else:
print("error")
print(num, card, nc)

x.append([id, num, card, nc])

df = pd.DataFrame(x, columns=['ID', '订单号', '车牌号', '金额'])

# 保存为 CSV 文件
df.to_csv("output.csv", index=False, encoding="utf-8")

场景五:数据销毁

题目名称:数据销毁

智能停车场管理平台存储了大量业主用户数据,现需检查用户是否存在弱口令,并验证昵称、邮箱和手机号是否充分脱敏。若用户口令出现在提供的字典中,则判定为弱口令风险;若用户的昵称、邮箱或手机号有任何未脱敏的字段,则判定为脱敏风险,后续将销毁存在风险的用户。字典需使用题目提供的附件:字典.txt。

【答案标准】例:若弱口令风险用户数量为100、脱敏风险用户数量为20,则最终提交答案为:100-20。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import hashlib

from dateutil import parser
import requests

headers = {
"host": "192.168.70.101:8080",
"cookie": "oFLm_2132_saltkey=fG2PzGA2; oFLm_2132_lastvisit=1762325807; PHPSESSID=av60vi0ujr96i8j8qja8kk8jvu; session=eyJ1c2VyX2lkIjoyfQ.aQsJNg.AWQc80KApt-1aa2y72W5-L8_hig; oFLm_2132_sid=Zx2hvE; oFLm_2132_ulastactivity=be5fWwd2Dn75yM_-Xfy6TEbvm2ANBmd3YwKe06vjAXbtzL0zD9WQ; oFLm_2132_auth=ed8e135YOi2Bk61D7d1nCVGYZ7zptn68ta4jRb86E6Ic6CuxXhASi3Lk-LrSxcnMj11D4-ZehnCaX-y5YfLv; oFLm_2132_logintime=1762331088; oFLm_2132_lip=192.168.2.230%2C1762269573; oFLm_2132_checkauthlic=1; oFLm_2132_checkupgrade=1; oFLm_2132_lastact=1762332018%09index.php%09banner; JSESSIONID=89e09ddd-9be1-4538-9a3e-0394b81e4f14"
}
data = {"pageSize": 1005,
"pageNo": 1
}
r = requests.post("https://192.168.70.101:8080/sys/user/list", data=data, verify=False, headers=headers)

kl = []
with open(r"C:\Users\jack8\Desktop\综合场景\字典.txt", 'r') as f:
for line in f:
p = line.strip()
m = hashlib.md5(p.encode()).hexdigest()
kl.append(m)

print(kl)
rkl = 0
wtm = 0
for i in r.json()['msg']['pageData']:
nickname = i['nickname']
email = i['email']
password = i['password']
# username = i['username']
mobile = str(i['mobile'])
if password in kl:
rkl += 1
print(nickname, email, password, mobile)
if "*" not in email or "*" not in nickname or "*" not in mobile:
wtm += 1

print(rkl) # 弱口令
print(wtm) # 未脱敏

场景六:模型安全

题目名称:成员推断

分析社区言论审核系统的模型响应特征,设计并实施成员推断攻击。根据平台提供的用户名列表(usernames.csv),构造查询请求并分析模型的概率输出,判断列表中每一个用户名所对应的用户数据是否被用于模型训练。您需要将您的判断结果整理成指定的CSV格式文件并提交至竞赛平台进行验证,平台将根据您推断的准确性进行评分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import json

import pandas as pd
import requests

headers = {
"origin": "http://192.168.70.101:8090",
"Content-Type": "application/json",
"cookie": "oFLm_2132_saltkey=fG2PzGA2; oFLm_2132_lastvisit=1762325807; PHPSESSID=av60vi0ujr96i8j8qja8kk8jvu; session=eyJ1c2VyX2lkIjoyfQ.aQsJNg.AWQc80KApt-1aa2y72W5-L8_hig; oFLm_2132_sid=Zx2hvE; oFLm_2132_ulastactivity=be5fWwd2Dn75yM_-Xfy6TEbvm2ANBmd3YwKe06vjAXbtzL0zD9WQ; oFLm_2132_auth=ed8e135YOi2Bk61D7d1nCVGYZ7zptn68ta4jRb86E6Ic6CuxXhASi3Lk-LrSxcnMj11D4-ZehnCaX-y5YfLv; oFLm_2132_logintime=1762331088; oFLm_2132_lip=192.168.2.230%2C1762269573; oFLm_2132_checkauthlic=1; oFLm_2132_checkupgrade=1; oFLm_2132_lastact=1762332018%09index.php%09banner; JSESSIONID=89e09ddd-9be1-4538-9a3e-0394b81e4f14",
"referer": "http://192.168.70.101:8090/",
}

df = pd.read_csv(r"C:\Users\jack8\Desktop\综合场景\成员推断\usernames.csv")

for index, row in df.iterrows():
username = row['username']

data1 = {"text": f"{username}"}
r = requests.post("http://192.168.70.101:8090/predict", data=json.dumps(data1), verify=False, headers=headers)

print(r.json()['probability'])

获取预测分数后,按分数判断成员

题目名称:对抗样本

人脸对抗样本:你的任务是作为AI安全人员,针对强盛物业的门禁系统进行安全测试。该系统提供/model_download接口供选手下载人脸识别模型文件。选手需基于附件提供的face_features.json(样例人脸特征向量)、face_features_convert.py(特征转换脚本)及face.jpg(转换后的样例图像),分析模型结构及其API返回信息,设计并生成对抗样本图像,使系统误识别为合法用户,从而登录系统后台,获取答案字符串。

pass

人工智能模型数据安全挑战赛

场景说明

本次人工智能模型数据安全挑战赛围绕金融场景下的四大核心系统展开:反欺诈、贷款审批、资产识别与智能客服。参赛者需依次完成数据清洗与标注、欺诈模型训练、逻辑回归模型参数窃取、对抗样本生成、人脸特征伪造及提示词注入攻击等任务。赛题涵盖数据安全、模型鲁棒性、隐私保护与AI越权访问等关键风险点,全面考察选手在AI安全攻防、数据处理与模型逆向工程等方面的综合能力,以提升对AI系统潜在安全威胁的认知与防御水平。

一、模型预训练

你是 “智慧金融 ” 公司新成立的 AI 安全部门的一名核心数据科学家。你的首个任务,就是参与开发 AI 反欺诈系统 ——“ 鹰眼计划 ”。鹰眼系统在通过分析实时交易流水,精准识别并拦截潜在的欺诈行为。整个计划分为三个关键阶段: 数据清洗数据标注模型训练 。你将亲手处理原始数据,并构建出 “ 鹰眼 ” 系统的第一个原型机。

【考题 1】数据清洗

对原始交易数据 raw_transactions.csv。请根据以下规则进行数据清洗:

  1. 处理无效交易金额 :amount 列存在空值(表示交易失败)和负数(可能表示退款)。在欺诈模型中,我们只关心正向流水的绝对值。

    • 将所有负数金额取其绝对值(转换为正数)。

    • 将所有空值金额用 0 进行填充。

  2. 补全商户类别 : merchant_category 列(商户类别)有一些记录缺失了。为了数据完整性,请将所有缺失值统一填充为字符串 UNKNOWN

  3. 剔除无效记录 : user_id 是用户唯一标识,必须存在。请删除所有 user_id 为空的行。

【场景附件】

  • raw_transactions.csv(原始交易数据)

【答案标准】
将清洗过后的文件保存为 cleaned_transactions.csv ,并提交至平台验证后得分。
(提交文件要完全符合题干要求,否则不得分)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import pandas as pd
#transaction_id user_id amount merchant_category merchant_country transaction_time

# 1. 读取原始数据
raw_file = 'raw_transactions.csv'
df = pd.read_csv(raw_file)

print("原始数据形状:", df.shape)
print("初始缺失值统计:")
print(df.isnull().sum())

# 2. 处理交易金额 amount 列
# - 负数 → 取绝对值(转为正)
df['amount'] = df['amount'].abs()

# - 空值用 0 填充
df['amount'] = df['amount'].fillna(0)

print("处理后 amount 列:无负数,空值已填充为 0")

# 3. 补全商户类别 merchant_category(缺失值填 'UNKNOWN')
df['merchant_category'] = df['merchant_category'].fillna('UNKNOWN')

print("处理后 merchant_category:缺失值已填充为 'UNKNOWN'")

# 4. 剔除 user_id 为空的行
initial_count = len(df)
df.dropna(subset=['user_id'], inplace=True)
cleaned_count = len(df)

print(f"删除了 {initial_count - cleaned_count} 条 user_id 为空的记录。")

# 5. 输出清洗后的数据到新文件
output_file = 'cleaned_transactions1.csv'
df.to_csv(output_file, index=False)

print(f"清洗完成!已保存至:{output_file}")

【考题 2】数据标注

请在你上一题生成的 cleaned_transactions.csv 文件基础上,新增一列名为 is_suspicious 的标签列。根据以下规则,判断每一笔交易是否可疑:

可疑交易规则(满足以下任一条件, is_suspicious 即为 1 ,否则为 0 ):

  1. 特定时段大额交易 : 交易时间( transaction_time )的小时部分( hour )在 0点到 5点之间(包含 0和 5 ),并且交易金额( amount )大于 10,000
  2. 高风险商户类别 : 商户类别( merchant_category )为 GAMBLING( 赌博 ) 或 JEWELRY ( 珠宝 ) 。
  3. 跨境高危地区交易 : 交易发生的商户所在国家( merchant_country )是 MYA ( 缅甸 ), PH( 菲律宾 ), 或 KH( 柬埔寨 )。

【答案标准】
将标注过后的文件保存为 labeled_transactions.csv ,并提交至平台验证后得分。
(提交文件要完全符合题干要求,否则不得分)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import pandas as pd

# 1. 读取清洗后的数据
input_file = 'cleaned_transactions.csv'
df = pd.read_csv(input_file)

print("原始数据形状:", df.shape)
print("\n前几行预览:")
print(df.head())

# 2. 确保 transaction_time 是 datetime 类型(如果尚未转换)
if not pd.api.types.is_datetime64_any_dtype(df['transaction_time']):
# 假设时间格式为 'YYYY-MM-DD HH:MM:SS',可根据实际调整
df['transaction_time'] = pd.to_datetime(df['transaction_time'], errors='coerce')

# 3. 提取小时部分(确保是整数)
df['hour'] = df['transaction_time'].dt.hour

# 4. 初始化 is_suspicious 列为 0
df['is_suspicious'] = 0

# 5. 应用规则1:特定时段大额交易
rule1_mask = (df['hour'].between(0, 5, inclusive='both')) & (df['amount'] > 10000)
df.loc[rule1_mask, 'is_suspicious'] = 1

# 6. 应用规则2:高风险商户类别
rule2_mask = df['merchant_category'].isin(['GAMBLING', 'JEWELRY'])
df.loc[rule2_mask, 'is_suspicious'] = 1

# 7. 应用规则3:跨境高危地区交易
rule3_mask = df['merchant_country'].isin(['MYA', 'PH', 'KH'])
df.loc[rule3_mask, 'is_suspicious'] = 1

# 8. 输出结果
output_file = 'labeled_transactions.csv'
df = df.drop(columns=['hour'])
df.to_csv(output_file, index=False)

print(f"\n✅ 标签处理完成!")
print(f"总交易数:{len(df)}")
print(f"可疑交易数量:{df['is_suspicious'].sum()}")
print(f"已保存至:{output_file}")

【考题 3】模型训练

请利用上一题生成的 labeled_transactions.csv文件。访问 “ 鹰眼 ” 模型训练系统,通过上传数据和调整参数,训练一个高性能的欺诈检测模型。最后,将生成的模型文件导出并提交,平台将根据其预测效果自动评分,决定你的最终成绩。

【答案标准】
将导出output.opkl(文件名必须相同 ) 模型文件压缩成 output.opkl.zip提交至平台验证后得分。
(提交的压缩包内的文件名必须为: output.opkl,最终得分:模型的 F1 分数 * 题目总分)

二、模型逆向分析

“智慧金融 ” 公司推出了一套 AI 在线贷款审批系统 ,其核心模型已通过黑盒 API 对外提供服务。 为保护商业机密,该 API 仅返回客户的 “ 贷款通过概率 ” 。然而,该接口存在被逆向的风险。一旦模型的内部权重参数被窃取,攻击者不仅能复制我们的核心业务,还能精心构造欺诈申请骗取贷款,对公司造成毁灭性打击。作为公司的安全顾问,你的任务是模拟攻击者,在仅能调用 API 的条件下,尝试窃取模型参数,从而全面评估我们这套核心系统面临的安全风险。

【题目 4】 权重窃取

选手需要分析 WEB 平台的预测接口,根据我们已知的模型输入特征,构造特定的 API 请求,反向推导出其内部逻辑回归( Logistic Regression )模型的全部 5个权重和 1 个偏置。
【题目附件】

submit1.csv(题目4提交模板)

  • 第一列为 Parameter
    • Bias: 偏置项
    • Weight_annual_income: 年收入(元)权重
    • Weight_age: 年龄权重
    • Weight_years_of_employment: 工作年限权重
    • Weight_num_credit_cards: 现有信用卡数量权重
    • Weight_debt_to_income_ratio: 债务收入权重
  • 第二列为 Value (填写符合题干要求的参数值)

【解题提示】
后台模型为标准的逻辑回归算法 。其核心计算过程如下:

  • 线性求和 :首先,模型将所有输入特征( x)与其对应的权重( w)相乘,然后加上偏置( b),得到一个中间值 z
    • z = (w1*x1 + w2*x2 + ... + w9*x9) + b
  • Sigmoid函数:然后,模型使用 Sigmoid 函数将 z 转换为一个 0 到 1 之间的概率 p。这个 p 就是系统返回给你的概率值。
    • p = 1 / (1 + e^(-z))

【答案标准】
将您窃取到的所有参数整合进一个 submit1.csv文件提交至平台。
(偏置得分占比 25% ,其余 4 个参数各占比 15% )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#utils.py
import numpy as np

def sigmoid(x):
"""Sigmoid函数"""
return 1 / (1 + np.exp(-np.clip(x, -250, 250)))

def inverse_sigmoid(p):
"""Sigmoid反函数"""
p = np.clip(p, 1e-10, 1 - 1e-10)
return np.log(p / (1 - p))

def normalize_vector(v):
"""向量归一化"""
norm = np.linalg.norm(v)
return v / norm if norm > 0 else v

def calculate_similarity(v1, v2):
"""计算向量相似度"""
return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
import numpy as np
import matplotlib.pyplot as plt
import requests
import json
import time
from utils import sigmoid, inverse_sigmoid


class WeightStealingAttacker:
"""权重窃取攻击客户端 - 支持结构化输入参数(如 annual_income, age 等)"""

def __init__(self, api_url):
self.api_url = api_url
self.feature_names = None # 所有特征名列表,动态获取
self.stolen_weights = None
self.stolen_bias = None
self.query_count = 0
self.attack_history = []
self._fitted = False

def _get_feature_names(self):
"""自动探测支持的特征名称(通过一次测试请求)"""
test_data = {
"annual_income": 100,
"age": 18,
"years_of_employment": 10,
"num_credit_cards": 1,
"debt_to_income_ratio": 0.5
}
try:
response = requests.post(
f"{self.api_url}/predict",
json=test_data,
headers={
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Origin': 'http://192.168.70.211:19102',
'Referer': 'http://192.168.70.211:19102/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0',
},
verify=False, # --insecure
timeout=30
)
if response.status_code == 200:
result = response.json()
self.feature_names = list(test_data.keys())
print(f"✅ 成功探测到特征字段: {self.feature_names}")
return True
else:
print(f"❌ 探测失败,状态码: {response.status_code}")
return False
except Exception as e:
print(f"❌ 请求异常: {e}")
return False


def _dict_to_vector(self, data_dict):
"""将输入字典转换为特征向量(按 feature_names 顺序)"""
if not self.feature_names:
raise ValueError("未初始化特征列表,请先调用 get_feature_names()")

vector = []
for name in self.feature_names:
value = data_dict.get(name, 0.0)
vector.append(float(value))
return np.array(vector)


def query_api(self, features):
"""查询受害者API - 支持 dict 或 list / np.ndarray"""
if isinstance(features, dict):
input_data = features
elif hasattr(features, '__iter__'):
# 假设是可迭代对象,如 list/np.array
if len(features) != len(self.feature_names):
raise ValueError(f"输入长度 ({len(features)}) 与特征数量 {len(self.feature_names)} 不匹配")
input_data = dict(zip(self.feature_names, features))
else:
raise TypeError("输入必须为字典、列表或 numpy 数组")

try:

input_data["annual_income"] = int(input_data["annual_income"])
input_data["years_of_employment"] = int(input_data["years_of_employment"])
input_data["debt_to_income_ratio"] = round(input_data["debt_to_income_ratio"],1)
print("input_data",input_data)
response = requests.post(
f"http://192.168.70.211:19102/predict",
json=input_data,
headers={
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Origin': 'http://192.168.70.211:19102',
'Referer': 'http://192.168.70.211:19102/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0',
},
verify=False, # --insecure
timeout=30
)

if response.status_code == 200:
result = response.json()
prob = result.get("probability")
if prob is None:
print(f"⚠️ 返回中无 'probability' 字段: {result}")
return None

self.query_count += 1
self.attack_history.append({
"input": input_data.copy(),
"output": float(prob),
"query_number": self.query_count
})
return prob
else:
print(f"API请求失败: {response.status_code}, 响应内容: {response.json()}")
return None

except Exception as e:
print(f"❌ 请求异常: {e}")
return None


def steal_weights_basic(self):
"""基础权重窃取攻击(按单个特征扰动)"""
if not self.feature_names:
print("⚠️ 未获取到特征名,正在尝试探测...")
if not self._get_feature_names():
print("❌ 探测失败,无法进行攻击")
return None, None

start_time = time.time()

# Step 1: 估计偏置项 (零向量查询)
zero_data = {name: 0.0 for name in self.feature_names}
prob_zero = self.query_api(zero_data)

if prob_zero is None:
print("❌ 零向量查询失败,无法估计偏置")
return None, None

estimated_bias = inverse_sigmoid(prob_zero)
print(f"✅ 估计偏置: {estimated_bias:.4f}, 概率={prob_zero:.6f}")

# Step 2: 逐个特征扰动(单位向量方式)
stolen_weights = np.zeros(len(self.feature_names))

print("🔍 正在估计各特征权重...")

for i, name in enumerate(self.feature_names):
perturbed_data = {n: (1.0 if n == name else 0.0) for n in self.feature_names}
prob = self.query_api(perturbed_data)

if prob is None:
print(f"❌ 特征 '{name}' 查询失败,跳过")
continue

weight_estimate = inverse_sigmoid(prob) - estimated_bias
stolen_weights[i] = weight_estimate

print(f" [{i + 1}/{len(self.feature_names)}] {name}: p={prob:.6f} → w={weight_estimate:.4f}")

time.sleep(0.2)

self.stolen_weights = stolen_weights
self.stolen_bias = estimated_bias

elapsed_time = time.time() - start_time
print(f"✅ 基础攻击完成!耗时: {elapsed_time:.2f}s, 查询次数: {self.query_count}")

# 输出结果
for name, w in zip(self.feature_names, stolen_weights):
print(f" {name}: {w:.4f}")
return stolen_weights, estimated_bias


def steal_weights_advanced(self, n_queries=300):
"""高级攻击:使用随机采样 + 线性拟合"""
if not self.feature_names:
print("⚠️ 未获取特征名,正在探测...")
if not self._get_feature_names():
return None, None

start_time = time.time()
X_queries = []
probs = []

print(f"🚀 开始高级攻击:生成 {n_queries} 次随机查询...")

for i in range(n_queries):
# 生成合理的随机特征值(模拟真实分布)
data = {
"annual_income": np.random.uniform(20, 150), # 单位: k
"age": np.random.randint(18, 70),
"years_of_employment": np.random.uniform(0, 40),
"num_credit_cards": np.random.randint(0, 10),
"debt_to_income_ratio": np.random.uniform(0.0, 1.0)
}

prob = self.query_api(data)

if prob is not None:
x_vec = self._dict_to_vector(data)
X_queries.append(x_vec)
probs.append(prob)

if (i + 1) % 50 == 0:
print(f" 进度: {i + 1}/{n_queries}, 成功查询: {len(probs)}")

time.sleep(0.07)

if len(X_queries) < 10:
print("❌ 有效查询不足,无法拟合")
return None, None

X_queries = np.array(X_queries)
probs = np.array(probs)
log_odds = inverse_sigmoid(probs)

# 使用最小二乘法求解线性模型:logit(p) ≈ w^T x + b
# 我们忽略偏置(或设为0),也可加入常数项列
X_augmented = np.hstack([X_queries, np.ones((len(X_queries), 1))])

try:
coeffs, residuals, rank, s = np.linalg.lstsq(X_augmented, log_odds, rcond=None)
stolen_weights = coeffs[:-1]
estimated_bias = coeffs[-1]

self.stolen_weights = stolen_weights
self.stolen_bias = estimated_bias

elapsed_time = time.time() - start_time
print(f"✅ 高级攻击完成!耗时: {elapsed_time:.2f}s, 有效查询: {len(X_queries)}")

for name, w in zip(self.feature_names, stolen_weights):
print(f" {name}: {w:.4f}")

except Exception as e:
print(f"❌ 拟合失败: {e}")
return None, None

return stolen_weights, estimated_bias


def evaluate_attack(self, true_weights=None, true_bias=None):
"""评估攻击效果"""
if self.stolen_weights is None:
print("⚠️ 未执行攻击,请先调用 steal_weights_* 方法")
return None, None

# 如果没有真实权重,跳过精确评估
if true_weights is None or true_bias is None:
print("⚠️ 无法获取真实模型参数,仅展示窃取结果")
weight_error = np.linalg.norm(self.stolen_weights) # 简化度量(无参考)
weight_cosine = 1.0 # 无意义
else:
try:
weight_error = np.linalg.norm(self.stolen_weights - true_weights)
weight_cosine = np.dot(self.stolen_weights, true_weights) / (
np.linalg.norm(self.stolen_weights) * np.linalg.norm(true_weights) + 1e-10
)
except Exception as e:
print(f"评估计算出错: {e}")
return None, None

print("\n📊 攻击效果总结:")
print(f" 总查询次数: {self.query_count}")
print(f" 权重误差 (L2): {weight_error:.4f}")
print(f" 余弦相似度: {weight_cosine:.4f}")

return weight_error, weight_cosine


def main():
"""主函数"""
print("=" * 60)
print("🎯 权重窃取攻击客户端 - 支持多参数输入")
print("=" * 60)

# 配置目标API地址和特征数量
API_URL = "http://192.168.70.211:19102"
# 创建攻击者实例
attacker = WeightStealingAttacker(API_URL)
try:
# 测试连接:发送一个示例请求验证服务可用性
print("🔍 正在测试API连接...")
test_data = {
"annual_income": 100,
"age": 18,
"years_of_employment": 10,
"num_credit_cards": 1,
"debt_to_income_ratio": 0.5
}
prob_test = attacker.query_api(test_data)

if prob_test is None:
print("❌ 无法连接到目标API,请检查服务是否运行!")
return

print(f"✅ API连接成功,测试返回概率: {prob_test:.6f}")

# 获取真实模型参数(用于验证)
print("\n🔍 正在获取真实权重信息...")
try:
response = requests.get(f"{API_URL}/model_info", timeout=10)
if response.status_code == 200:
model_info = response.json()
true_weights = np.array(model_info['true_weights'])
true_bias = model_info['true_bias']
print("✅ 获取真实权重成功")
else:
print(f"⚠️ 无法获取模型信息 (状态码: {response.status_code}),跳过精确评估")
true_weights = true_bias = None
except Exception as e:
print(f"⚠️ 获取真实参数失败: {e},将仅展示攻击结果")
true_weights = true_bias = None

# 用户选择攻击方式
print("\n🔢 攻击模式选择:")
print("1. 基础权重窃取(基于单位向量 + 零向量)")
print("2. 高级权重窃取(随机采样+最小二乘拟合,推荐)")

choice = input("请选择攻击方式 (1/2): ").strip()
if choice not in ["1", "2"]:
print("❌ 无效选择,默认使用高级模式...")
choice = "2"

# 执行攻击
if choice == "1":
stolen_weights, stolen_bias = attacker.steal_weights_basic()
else:
stolen_weights, stolen_bias = attacker.steal_weights_advanced(n_queries=500)

# 检查是否成功窃取
if stolen_weights is None or len(stolen_weights) == 0:
print("❌ 权重窃取失败,请检查网络或API行为。")
return

# 输出结果
print(f"\n✅ 攻击完成!窃取模型参数如下:")
feature_names = ["annual_income", "age", "years_of_employment",
"num_credit_cards", "debt_to_income_ratio"]
for i, name in enumerate(feature_names):
if i < len(stolen_weights):
print(f" {name}: {stolen_weights[i]:.6f}")
print(f" bias (偏置): {stolen_bias:.6f}")

# 评估攻击效果
if true_weights is not None:
attacker.evaluate_attack(true_weights, true_bias)
else:
print("⚠️ 无真实模型参数,无法精确评估。")

# 显示API统计信息(可选)
try:
stats_response = requests.get(f"{API_URL}/stats", timeout=10)
if stats_response.status_code == 200:
stats = stats_response.json()
print("\n📊 API统计信息:")
print(f" 总查询次数: {stats['total_queries']}")
print(f" 可疑查询次数: {stats['suspicious_queries']}")
print(f" 防御模式: {stats['defense_mode']}")
else:
print("⚠️ 无法获取统计信息")
except Exception as e:
print(f"⚠️ 获取统计失败: {e}")

except Exception as e:
print(f"❌ 主程序异常: {e}")


if __name__ == "__main__":
main()

【题目 5】概率攻击

选手精心构造一组全新的、合理的贷款申请参数。当你将这组参数提交给预测系统时,系统返回的预测概率要精确等于 :0.9956410313816209
【题目附件】
submit2.csv(题目 5提交模板)

  • 第一列为 Parameter
    • annual_income: 年收入(元)
    • age: 年龄
    • years_of_employment: 工作年限
    • num_credit_cards: 现有信用卡数量
    • debt_to_income_ratio: 债务收入比
  • 第二列为 Value (填写符合题干要求的参数值)

【答案标准】
将相关参数整合进 submit2.csv文件提交至平台,平台将根据您提交参数的准确性进行判分。
(提交文件要完全符合题干要求,否则不得分)

爆破参数直到probability符合条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import requests

for i in range(518000, 550000):
for j in range(18, 80):
for .......:
response = requests.post(
f"http://192.168.70.211:19102/predict",
json={
"annual_income": i,
"age": j,
"years_of_employment": 10,
"num_credit_cards": 5,
"debt_to_income_ratio": 0.5
},
verify=False, # --insecure
timeout=10
)
print(i, response.json()['probability'])
if response.json()['probability'] == 0.9956410313816209:
print(i, j)
break

【题目 6】篡改溯源

系统日志显示,一份关键申请人的档案在录入时发生了数据异常,导致其中一个字段被错误地修改。幸运的是,在数据损坏前,系统已经为原始的、正确的档案计算并存储了一个 “ 预测概率值 ” ,我们可以将其视为原始数据的“校验和 ” ,请你对下面这份已损坏的档案进行分析,找出哪一个字段被篡改了,并恢复其原始的、正确的数值。
【案件线索】

  1. 原始校验和 ( 正确的预测概率 ): 0.8550518770812063

  2. 已损坏的档案记录 :

    1
    2
    3
    4
    5
    annual_income:250000.5
    age:45
    years_of_employment:40
    num_credit_cards:5
    debt_to_income_ratio:0.5

【答案标准】
找到被篡改的字段名称和其恢复后的正确整数值后,将它们格式化为 字段名:数值 (英文冒号)的字符串,然后提交该字符串。
假设你发现是 age 被改了,其原始值应为 25,构造字符串 : age:25作为答案提交。

同上,遍历攻击

三、图像对抗攻击

“智慧金融 ” 公司开发了一套基于图像的资产识别系统,用于自动化识别用户上传的资产图像,以快速评估用户申请贷款时提交的抵押品(如车辆、房子等)。该系统提升人工识别效率,并有效防范利用伪造或不合格资产进行的欺诈行为。

【题目 7】图像对抗

系统提供了一个基于 ResNet50 深度学习模型的图像识别平台,该模型使用 ImageNet 数据集进行预训练,能够识别不同的物体类别。平台中有一张默认的参考图片,你的任务是上传一张经过精心设计的对抗样本图片,使其满足以下两个看似矛盾的条件:

  1. 与默认图片在视觉上高度相似(相似度≥ 95% )
  2. 被模型识别为完全不同的类别

【答案标准】
提交验证成功后得到的字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
"""
The spatial attack is a very special attack because it tries to find adversarial
perturbations using a set of translations and rotations rather then in an Lp ball.
It therefore has a slightly different interface.
"""
import torch
import torchvision.models as models
import eagerpy as ep
from PIL import Image
from foolbox import PyTorchModel, accuracy, samples, attacks, criteria
import foolbox.attacks as fa
from torchvision import transforms
from torchvision.models import ResNet50_Weights
from torchvision.utils import save_image


def main() -> None:
# instantiate a model (could also be a TensorFlow or JAX model)

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = models.resnet50(weights=ResNet50_Weights).eval()
preprocessing = dict(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], axis=-3)
fmodel = PyTorchModel(model, bounds=(0, 1), preprocessing=preprocessing)

# get data and test the model
# wrapping the tensors with ep.astensors is optional, but it allows
# us to work with EagerPy tensors in the following
image_path = "default.jpg" # 这里改为你的自定义图片路径

# 定义图片预处理流程(与模型期望的输入匹配)
preprocess = transforms.Compose([
# transforms.Resize((224, 224)), # 调整为模型输入尺寸(根据模型修改)
transforms.ToTensor(), # 转换为Tensor并归一化到[0,1]
])

# 加载并处理单张图片
image = Image.open(image_path).convert("RGB") # 确保是RGB格式
image_tensor = preprocess(image).unsqueeze(0) # 添加批次维度 (1, 3, 224, 224)
image_tensor = image_tensor.to(DEVICE)

labels_tensor = torch.tensor([404], device=DEVICE) # 标签

images, labels = ep.astensors(image_tensor, labels_tensor)
clean_acc = accuracy(fmodel, images, labels) * 100
print(f"clean accuracy: {clean_acc:.1f} %")

attack_target = 1
target_class = torch.tensor([attack_target]).to(DEVICE)

attack = attacks.LinfPGD(
abs_stepsize=0.05 / 255.0,
)

adversarials, aaa, success = attack(
fmodel,
image_tensor, labels_tensor,
# criterion=criteria.TargetedMisclassification(target_class),
epsilons=0.01
)

with torch.no_grad():
preds = fmodel(adversarials).argmax(axis=1)

print("预测类别:", preds.item(), "是否成功攻击到目标类:", preds.item() == target_class.item())

adv_224 = adversarials.detach().clamp(0.0, 1.0)
save_image(adv_224, "adversarial_pgd1111.jpg")

image_path = "adversarial_pgd1111.jpg" # 这里改为你的自定义图片路径

# 定义图片预处理流程(与模型期望的输入匹配)
preprocess = transforms.Compose([
# transforms.Resize((224, 224)), # 调整为模型输入尺寸(根据模型修改)
transforms.ToTensor(), # 转换为Tensor并归一化到[0,1]
])

# 加载并处理单张图片
image = Image.open(image_path).convert("RGB") # 确保是RGB格式
image_tensor = preprocess(image).unsqueeze(0) # 添加批次维度 (1, 3, 224, 224)
image_tensor = image_tensor.to(DEVICE)

with torch.no_grad():
logits = model(image_tensor).squeeze(0)
logits_float = logits.float()
pred_idx = torch.argmax(logits_float).item()
print(pred_idx)


if __name__ == "__main__":
main()

本地执行成功,线上(黑盒?)环境攻击失败

【题目 8】人脸伪造

资产图像识别系统的后台采用基于逻辑回归的人脸识别登录系统,该系统通过提取人脸特征向量,使用逻辑回归模型进行二分类判断,以确定用户身份是否匹配。

利用逻辑回归算法的线性特性和参数可预测性,采用黑盒攻击方式提取出完整的模型参数包括权重矩阵 W 和偏置向量 b,然后基于这些参数逆向计算并构造出能够通过身份验证的管理员人脸特征向量,使其在逻辑回归模型中产生正确的分类结果从而成功绕过身份验证系统获得管理员权限。

【平台附件】

  • 测试人脸 .png

【答案标准】
提交登录后台成功显示的字符串。

pass

四、大模型攻击

“智慧金融” 公司为了提升客户服务体验与运营效率,正式上线了一套全新智能客服系统。该系统基于 AI 大语言模型技术,能够实时解决用户的各类问题。无论是常规的账户查询、业务办理指引,还是复杂的理财产品咨询和个性化投资建议,都能精准理解用户的自然语言提问,并迅速提供专业、清晰的解答。

【题目 9】模型越狱

智慧金融公司新上线的 AI 智能客服为其集成的用户信息查询模块( MCP )设定了严格的权限 :仅能访问当前认证用户的个人数据。你的任务是,在登录自己(王浩)账户的情况下,构建一段巧妙的恶意提示词,绕过这层安全边界,诱使 AI 违规查询并泄露出另一指定用户 ‘陈静( id:2 ) ’的完整个人信息。

【答案标准】
成功获取陈静的身份证号后,将其作为最终答案提交至平台验证得分。

pass

【题目 10 】提示词窃取

由于一次部署过程中的配置疏忽, AI 智能客服的系统提示词中,被意外地硬编码了一段包含后台系统密码的调试信息。你的任务是进行一次精妙的提示词逆向工程,诱导 AI 完整复述出其自身的系统提示词。请注意,系统部署了关键词过滤功能,能有效拦截 AI 直接回复敏感信息等内容。你需要构建出一段出其不意的提示词,绕过这层防御,并从泄露的内容中成功获取系统密码 ,将其作为最终答案提交。

【答案标准】
成功获取系统密码后,将其作为最终答案提交至平台验证得分。

pass

题目附件

1
2
3
通过网盘分享的文件:全国总工会第二届职工数字化应用技术技能大赛附件.zip
链接: https://pan.baidu.com/s/11y0Skb5TATM61PMB5itwhg?pwd=da5a 提取码: da5a
--来自百度网盘超级会员v9的分享