全国总工会第二届职工数字化应用技术技能大赛部分WP 数据安全技能赛 算法恢复
题目描述
某加密产品采用标准国密算法进行数据安全防护,但某安全研究员逆向出代码,发现采用特殊代码逻辑进行数据处理,这是一种自同步流密码,请分析提供代码文件,基于截获的密文及侧信道数据,恢复出原始数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import osfrom secret import *from gmssl import sm4, funcfrom gmssl.sm4 import CryptSM4, SM4_ENCRYPTfrom Crypto.Util.number import *assert (len (flag) == 38 )assert flag[:5 ] == b'flag{' and flag[-1 :] == b'}' assert (len (key) == 16 )def padding (msg ): tmp = 16 - len (msg) % 16 pad = format (tmp, '02x' ) return bytes .fromhex(pad * tmp) + msg message = padding(flag) cnt = len (message) // 16 messages = [message[16 *i:16 *(i+1 )] for i in range (cnt)] hint = bytes_to_long(key) ^ bytes_to_long(message[:16 ]) IV = os.urandom(16 ) crypt_sm4 = CryptSM4() crypt_sm4.set_key(key, SM4_ENCRYPT) prev_ci = IV output = b'' for msg in messages: enc_output = func.list_to_bytes(crypt_sm4.one_round(crypt_sm4.sk, prev_ci)) ci = bytes ([msg[j] ^ enc_output[j] for j in range (16 )]) prev_ci = ci output = output + ciprint ('enc =' , output.hex ())print ('hint =' , hex (hint)[2 :])
解题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 from gmssl.sm4 import CryptSM4, SM4_ENCRYPTfrom gmssl import func enc_hex = "da1db10716b19792c4957557e5bb8c4586d17a21281fae1b3b14a6cb80a4d084b3804d4d95cebffe5e4bb1dca85ae726" hint_hex = "3c3b4f3b3d3f683e3a49545c53013e77" enc = bytes .fromhex(enc_hex) hint = bytes .fromhex(hint_hex) pad_len = 10 m0_prefix = bytes ([pad_len]) * pad_len + b"flag{" blocks = [enc[i:i+16 ] for i in range (0 , len (enc), 16 )]def xor_bytes (a, b ): return bytes (x ^ y for x, y in zip (a, b))def is_printable (bs ): return all (0x20 <= b <= 0x7e for b in bs)for g in range (256 ): m0 = m0_prefix + bytes ([g]) key = xor_bytes(hint, m0) crypt = CryptSM4() crypt.set_key(key, SM4_ENCRYPT) msg_blocks = [m0] for i in range (1 , len (blocks)): prev = blocks[i-1 ] ek = func.list_to_bytes(crypt.one_round(crypt.sk, prev)) msg_blocks.append(xor_bytes(blocks[i], ek)) msg = b"" .join(msg_blocks) p = msg[0 ] if p > 16 : continue flag = msg[p:p+38 ] if len (flag) == 38 and flag.startswith(b"flag{" ) and flag.endswith(b"}" ): if is_printable(flag[5 :-1 ]): print ("key =" , key.hex ()) print ("flag =" , flag.decode()) break
lmr_02
题目描述
解密medical_data.json获得明文的ssn号码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from Crypto.Cipher import AESfrom Crypto.Protocol.KDF import scryptimport base64import jsonclass MedicalEncryptor : def __init__ (self, password: str ): self.salt = b'fixed_salt_12345678' self.key = scrypt(password, self.salt, 16 , N=1024 , r=8 , p=1 ) def encrypt_record (self, patient_data: dict ) -> dict : patient_num = patient_data["patient_id" ].replace("PT" , "" ) nonce = f'medical_nonce_{patient_num} ' .encode().ljust(16 , b'\x00' ) cipher = AES.new(self.key, AES.MODE_GCM, nonce=nonce) associated_data = json.dumps({ "patient_id" : patient_data["patient_id" ], "record_type" : "medical" }).encode() cipher.update(associated_data) sensitive_data = { 'diagnosis' : patient_data['diagnosis' ], 'ssn' : patient_data['ssn' ], 'insurance' : patient_data['insurance' ] } plaintext = json.dumps(sensitive_data).encode() ciphertext, tag = cipher.encrypt_and_digest(plaintext) return { 'salt' : base64.b64encode(self.salt).decode(), 'nonce' : base64.b64encode(nonce).decode(), 'tag' : base64.b64encode(tag).decode(), 'encrypted_data' : base64.b64encode(ciphertext).decode() }if __name__ == "__main__" : encryptor = MedicalEncryptor("hospital123" ) patient_data = { "patient_id" : "PT2024001" , "diagnosis" : "示例诊断" , "ssn" : "123-45-6789" , "insurance" : "INS123456789" } encrypted = encryptor.encrypt_record(patient_data) print (encrypted) print ("加密完成" )
解题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import jsonimport base64from Crypto.Cipher import AESfrom Crypto.Protocol.KDF import scryptwith open ("medical_data.json" , "r" , encoding="utf-8" ) as f: medical_encrypted = json.load(f)with open ("pwd.txt" , "r" , encoding="utf-8" ) as f: passwords = [line.strip() for line in f]def decrypt_record (record, password ): salt = base64.b64decode(record['salt' ]) nonce = base64.b64decode(record['nonce' ]).ljust(16 , b'\x00' ) tag = base64.b64decode(record['tag' ]) ciphertext = base64.b64decode(record['encrypted_data' ]) aad = base64.b64decode(record['associated_data' ]) key = scrypt(password.encode(), salt, 16 , N=1024 , r=8 , p=1 ) cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) cipher.update(aad) plaintext = cipher.decrypt_and_verify(ciphertext, tag) return json.loads(plaintext.decode('utf-8' ))for pw in passwords: for record in medical_encrypted['encrypted_records' ]: pid = record.get('patient_id' , '<unknown>' ) try : pt = decrypt_record(record, pw) print (f"--- Decrypted patient {pid} ---" ) print (pt) except Exception: continue
数据风险排查 背景描述
近期,在集团例行数据安全检查中发现多起异常行为:内部主机通过 SSH 传输了疑似部署私钥文件,并将包含用户信息的csv表格外发;同时邮件系统审计显示存在未经授权的root账号与明文密码传输情况。经初步判断,本次事件涉及密钥泄露与大量用户敏感信息外泄,已列为高优先级安全事件,需根据附件中的邮件文件和流量信息分析下面的问题。
题干说明
【题目1】 数据泄露
经对附件中抓取的网络流量分析发现存在疑似私钥文件被传输。请分析流量包并确认传输私钥文件的客户端IP 地址,并计算将该 IP 字符串的MD5,结果需采用 32 位小写十六进制格式;最终将该 MD5 作为答案提交。
【答案标准】
若 IP 地址为 1.1.1.1 ,计算MD5后得到的32位小写字符串: e086aa137fa19f67d27b39d0eca18610 ,将此md5字符串作为答案提交。
分组字节流查找关键字符串 PRIVATE KEY 得到私钥传输记录 ip为 10.0.1.50
【题目2】 数据传输
经对相关邮件进行详尽分析,已确认存在主机用户凭证(用户名与明文密码)通过邮件被泄露的情况。请核实该
泄露邮件的发送者邮箱地址,并计算该邮箱地址的MD5,计算结果以 32 位小写十六进制形式表示,最终将该
MD5 值作为答案提交。
【答案标准】
若邮件发信地址为 addr@126.com ,则计算MD5后得到: b278947ee43ee383150a65c85c8748ff 。
编写脚本解码邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import base64import os folder = r"mails" for filename in os.listdir(folder): file_path = os.path.join(folder, filename) with open (file_path, "r" , encoding="utf-8" ) as f: content = f.read().split("\n\n" ) if len (content) > 1 : body = content[1 ].replace("\n" , "" ) try : decoded = base64.b64decode(body).decode() if "root" in decoded: print (content[0 ]) print (decoded) except Exception as e: print (f"文件 {filename} 解码失败: {e} " )
matthew44@temp-mail.org 发送的邮件中存在明文用户名和密码
【题目3】 数据溯源
经将附件中的流量包与题目二的分析结果核对后,需进一步确认本次事件的责任人。流量中传输的用户表已明确
显示每条记录的用户ID与对应邮箱地址,依据比对结果确认责任用户后,请将该用户的 ID 进行 MD5 计算,并以
32 位小写十六进制字符串形式提交计算结果作为最终答案。
同样查找关键字
或者使用strings 命令过滤
签名疑云 背景描述
某企业日常会产生大量敏感业务文档。为保障文件完整性与责任可追溯,公司明确管理规范:员工创建 / 修改的文件需添加个人数字签名,并更新文档元数据中的作者信息。近期,信息安全部门在例行检查中,发现文件数字签名相关环节存在部分异常情况,需通过后续分析进一步排查。请你作为企业的数据安全工程师来完成这份工作。
相关信息
附件信息:
document目录:存放了经过数字签名的文档
certificates目录:存放证书文件,包括企业根证书和用于验证签名的证书
signature_records.csv文件:记录每个文件与签名者的关系
题干说明
【题目1】 数据溯源
某些员工在签名时没有使用自己的证书签名,请使用分析签名记录和文件元数据中的作者字段,找到作者与数签名不同的文件,将这些文件的文件编号按从小到大的顺序,使用英文半角逗号连接,转为32位小写md5提交。
【答案标准】
本题答案为32位小写md5值。举例说明,若找到的没有使用作者证书签名的PDF文件有:12.pdf,13.pdf,1.pdf,拼接后的字符串应为:1,12,13,最终提交的md5值为:22774c7c7335cb7c2eed3622537c4a88
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import osimport csvimport hashlibfrom PyPDF2 import PdfReader document_dir = "documents" signature_csv = "signature_records.csv" signature_dict = {} with open (signature_csv, newline='' , encoding='utf-8' ) as f: reader = csv.DictReader(f) for row in reader: filename = row['文件名' ].strip() signer = row['签名者' ].strip() signature_dict[filename] = signer mismatch_files = []for filename in os.listdir(document_dir): file_path = os.path.join(document_dir, filename) reader = PdfReader(file_path) author = reader.metadata.get('/Author' , '' ).strip() signer = signature_dict[filename] if author != signer: file_number = int (filename.split('.' )[0 ]) mismatch_files.append(file_number) mismatch_files.sort() concat_str = "," .join(str (num) for num in mismatch_files) md5_hash = hashlib.md5(concat_str.encode('utf-8' )).hexdigest()print ("拼接字符串:" , concat_str)print ("MD5:" , md5_hash)
【题目2】 数据伪造
在审查中发现,某些员工用于签名的证书并不是由公司的根证书颁发,这意味着文件可能是伪造的,请使用公司根证书对员工证书进行校验,找出无法通过校验的员工证书,按照证书序列号按数值从小到大进行排序(注:序列号为16进制,字母请使用小写字母,中间不要使用空格等其他字符分割),并使用英文半角逗号连接,转为32位小写md5提交。
【答案标准】
本题答案为32位小写md5值,举例说明,若找到的序列号为:
1: d2e84f1a67c3b9528d74a6e039bcf7852e9461d8
2: 4a7b2c9de1f83642a95e73b4c618df298a6307b5
3: 96c13af4b728e5d942865ca3f01b79cd58e294a7
拼接后字符串为:
4a7b2c9de1f83642a95e73b4c618df298a6307b5,96c13af4b728e5d942865ca3f01b79cd58e294a7,d2e84f1a67c3b9528d74a6e039bcf7852e9461d8
最终提交的md5值为:0f75590f8dcd78f01ff1c1e20ebe7e47
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import hashlibimport osfrom cryptography import x509from cryptography.hazmat.backends import default_backendwith open ("certificates/rootca.crt" , "rb" ) as f: root_cert = x509.load_pem_x509_certificate(f.read(), default_backend()) root_ski = root_cert.extensions.get_extension_for_oid( x509.ExtensionOID.SUBJECT_KEY_IDENTIFIER ).value.public_bytes().hex ()[-40 :]print (root_ski) err_serials = [] employees_cert_dir = "certificates/employees" for filename in os.listdir(employees_cert_dir): path = os.path.join(employees_cert_dir, filename) with open (path, "rb" ) as f: cert = x509.load_pem_x509_certificate(f.read(), default_backend()) aki = cert.extensions.get_extension_for_oid( x509.ExtensionOID.AUTHORITY_KEY_IDENTIFIER ).value.public_bytes().hex ()[-40 :] if aki != root_ski: err_serials.append(format (cert.serial_number, "x" )) err_serials.sort() concat_str = "," .join(err_serials) md5_hash = hashlib.md5(concat_str.encode("utf-8" )).hexdigest()print ("拼接字符串:" , concat_str)print ("MD5:" , md5_hash)
【题目3】数据篡改
近期发现若干 PDF 在校验前置过程中暴露出结构异常,使得我们所依托的签章脚本尚未进入深度验证便被迫终止;请整理出所有因前置结构校验异常而被流程中止的文档编号。将这些编号按从小到大的顺序,以英文半角逗号连接,并转换为32位小写 md5 后提交。
【答案标准】
本题答案为32位小写md5值。举例说明,若因结构级异常而在前置环节被中止的 PDF 文件编号为:12.pdf,13.pdf,1.pdf,则拼接后的字符串为:1,12,13,最终提交的 md5 值为:22774c7c7335cb7c2eed3622537c4a88
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import hashlibimport osfrom pyhanko.pdf_utils.reader import PdfFileReaderfrom pyhanko.sign.validation import validate_pdf_signature base_dir = "documents" err = []for f in os.listdir(base_dir): with open (os.path.join(base_dir, f), 'rb' ) as doc: try : r = PdfFileReader(doc) sig = r.embedded_signatures[0 ] status = validate_pdf_signature(sig) status.pretty_print_details() except : err.append(int (f.split('.' )[0 ])) err.sort() err_txt = "," .join(str (num) for num in err) md5_hash = hashlib.md5(err_txt.encode('utf-8' )).hexdigest()print ("拼接字符串:" , err_txt)print ("MD5:" , md5_hash)
模型偏见修复 场景描述
某电商平台的情感分析模型因训练数据中“价格高”常与差评共现,错误地将高价商品评论一概判为负面,忽视了用户对“物有所值”的积极评价。例如,“虽然贵但超值!”这类真实正向反馈被误判为差评,影响优质商品曝光。为校正这一系统性偏见,需在不改变语义的前提下,对100条高价正向评论施加不超过3个字符的微小扰动,引导模型关注真实情感而非价格标签,从而提升模型的公平性与准确性。
题干信息
选手需基于提供的 sample_reviews.csv 文件中的 100 条高价正向评论(如“虽然贵但超值!”),为每条评论设计一个不超过3个字符的修复文本(例如:👍),并填写至 fix_sample.csv 的对应 text 字段中。访问在线环境,将 fix_sample.csv 上传并进行预测,平台会将该修复文本自动拼接至原始评论末尾,形成新输入(例如:“虽然贵但超值!👍”)如果正向样本大于80,就可以将 fix_sample.csv 作为答案提交到答题平台上。平台会根据正向样本数进行打分。
提供信息
【平台附件】
fix_sample.csv(修复字符样本模板文件,包含id和text两列,id与 sample_reviews.csv 的id对应,text字段不得超过3个字符)sample_reviews.csv(100条高价正向评论样本数据,包含id和review两列)
答案提交
提交仅包含2列内容的fix_sample.csv文件,总数必须为100行,第一行列名不计入。第一列为id(需要与sample_reviews.csv中的id对应,严格按照顺序,否则影响成绩)
第二列为text(修复字符内容,不得超过3个字符,用于增强正向情感表达)。
注意:提交的CSV文件务必选择UTF-8编码保存,否则会出现解析失败。
评分标准
最终得分 = (预测的正向样本 × 0.1) × 当前题目总分。(正向样本数小于80不得分)
注意事项
最终答案文件名必须为fix_sample.csv,csv文件编码保存为UTF-8。
每位选手提交平台的次数为10次,提交格式失败会返回错误原因,不得分,扣除次数,请注意符合格式规范。
在提交答案期间,平台将在最多10次成功提交中选取最高分作为最终成绩。
偏见修复解题: 通过正向词拼接测试,发现拼接”不错”时偏见均修复
1 2 3 4 5 6 7 8 9 import pandas as pd x = ['不错' ,] out = []for i in range (1 , 101 ): out.append([i, x[0 ]])print (out) df = pd.DataFrame(out, columns=['id' , 'text' ]) df.to_csv("fix_sample.csv" , index=False , encoding='utf-8' )
视频推荐投毒 场景描述
视频平台为了提升用户粘性和观看时长,上线了一套视频推荐系统。该系统基于用户的历史观看和评分行为,采用协同过滤算法 为用户推荐他们可能感兴趣的新视频。一家竞争对手”快视科技”希望进行恶意竞争,通过污染视频平台的推荐系统,让系统向观看过热门大片(如环太平洋》)的潜在用户错误地推荐低质量的、制作粗糙的《环大西洋》电影 ,从而降低用户体验和平台的广告收入。
题干信息
你将扮演”快视科技”的攻击者。你将获得平台现有的部分良性用户观看记录。你的任务是通过注册并伪造不超过500条虚假用户 的观看历史,构造投毒数据并注入系统。重新训练后,你的目标是让推荐模型在给定的”目标用户列表”(这些用户大部分曾观看过《环太平洋》)中,将”《环大西洋》”推荐的排名尽可能提升 ,进入Top-5推荐列表。
提供信息
【平台附件】
existing_rentals.csv :包含 user_id , item_id , rating 的原始良性观看记录。
item_info.csv :视频ID与视频名称的对应表(如 item_101 : “《环太平洋》”, item_205 : “《环大西洋》”)。
target_users.csv :需要被攻击的 user_id 列表(5个目标用户)。
submit.csv :用于提交答案的模板,格式与 existing_rentals.csv 相同,但包含你构造的虚假用户和他们的观看记录。
【解题提示】
系统会找到与你品味最相似的一群用户(称为”邻居”),然后把这些”邻居”们都喜欢、但你还没接触过的视频推荐给你。相似度的判断,完全依赖于你们对相同视频的历史评分记录。答案提交 提交 submit.csv 文件到平台。
评分标准
平台会用你提交的投毒数据 submit.csv 与原始数据合并,重新训练推荐模型。评分将基于在target_users.csv 中,有多少用户的Top-5推荐列表里成功出现了”《环大西洋》”。
最终得分 = 成功渗透的用户个数 ×(题目总分 × 0.2)
例如:
成功渗透了4个用户,当前题目总分:100分
最终得分 = 4 × (100 × 0.2)= 80分
注意事项
最终答案文件名必须为submit.csv,csv文件编码保存为UTF-8。
每位选手提交平台的次数为10次,提交格式失败会返回错误原因,不得分,扣除次数,请注意符合格式规范。
在提交答案期间,平台将在最多10次成功提交中选取最高分作为最终成绩。
推荐投毒解题: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import randomimport pandas as pd u = pd.read_csv(r"target_users.csv" ) i = pd.read_csv(r"item_info.csv" ) user = [] item = []for index, row in u.iterrows(): user.append(row['user_id' ])for index, row in i.iterrows(): item.append(row['item_id' ])print (user)print (item) out = []for uu in user: for ii in item: for x in range (9 ): if ii == 'item_205' : print (uu, ii, 5 ) out.append([uu, ii, 5 ]) else : print (uu, ii, random.randint(1 , 4 )) out.append([uu, ii, random.randint(1 , 4 )])print (len (out)) df = pd.DataFrame(out, columns=['user_id' , 'item_id' , 'rating' ]) df.to_csv("submit.csv" , index=False , encoding='utf-8' )
设置 item_205 为最高分,其他随机低分,尝试获得渗透最高用户数 (笨办法)
数据安全综合场景赛 场景说明
强盛集团作为一家大型物业公司,致力于为客户提供高质量的物业管理服务。在一次新物业入住的过程中,公司组织了数据安全风险检查,以确保客户和业主的信息得到妥善保护。检查的重点包括业主个人信息、物业管理数据、以及支付和账单记录等敏感信息的存储和传输方式。为了应对潜在的安全威胁,强盛集团对数据库进行了全面审计,并测试了系统的加密机制、访问权限设置和漏洞防护措施。此外,检查还涵盖了员工数据访问行为的监控,确保只有授权人员才能接触到敏感数据。通过这次风险检查,强盛集团进一步提升了数据安全管理能力,增强了业主对物业管理平台的信任,确保了新物业入住过程中数据安全的合规性。
场景拓扑
场景题干
场景一:采集安全 题目名称:越权采集
物业公司开发了一款新的移动应用(APP),但是该应用因涉嫌越权采集用户信息被应用商城下架。现有一个数据库记录了该APP采集的用户信息。请根据该数据库中的数据,分析并找出该应用越权采集了哪些用户信息,并提交涉及越权采集的数据ID作为答案。数据库名称:app、数据库用户:root、数据库密码:1234567。
【答案标准】若越权采集data_id为4,98,32,400,则最终提交答案为:4-32-98-400。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import jsonimport pandas as pd qx = pd.read_csv("app_user.csv" ) qxt = {}for index, row in qx.iterrows(): qxt[row['user_id' ]] = json.loads(row['permissions' ]) df = pd.read_csv("app_user_data.csv" ) a = []for index, row in df.iterrows(): if row['user_id' ] in qxt and row['data_type' ] in qxt[row['user_id' ]]: pass else : print (row['data_id' ], end="-" )
场景二:数据传输 题目名称:证书安全
智能停车场管理平台使用了数字证书加密了https流量,请提交数字证书颁发者。
【答案标准】例:若颁发者为张三,则最终提交答案为:张三。
浏览器直接查看证书
题目名称:防泄漏排查
智能停车场管理平台存在数据库泄漏漏洞。请查找该漏洞的根本原因,并从数据库中提取激活证书作为答案提交。
【答案标准】例:若数据库中激活证书为ddc80d31ca1d43b7b97d7f6bd6e6b85c,则最终提交答案为:ddc80d31ca1d43b7b97d7f6bd6e6b85c。
系统存在sql执行直接写sql查表
场景三:数据存储 题目名称:脱敏安全
强盛物业公司开发了一款业主交流平台,平台采用现代密码学中的RSA算法对业主数据进行脱敏保护。请根据提供的数据库和Web系统功能,分析该脱敏方法的安全性。如果存在安全风险,请提取admin用户的明文身份证号作为答案提交。数据库名称:flask_rsa_demo、数据库用户:root、数据库密码:1234567。
【答案标准】例:若admin的身份证号为:320888123456789011,则最终提交答案为:320888123456789011。
pass
题目名称:版本识别
系统更迭通常伴随着系统优化和BUG修复,请提交强盛物业当前使用的档案管理系统版本号。【答案标准】例:若系统版本号为V8.1.2.0,则最终提交答案为:8.1.2.0。
主页直接查看
题目名称:合规审查
强盛物业档案管理系统存储了一批业主身份证照片。根据公司规定,存储此类数据必须进行脱敏处理。请统计系统中有多少张照片已符合合规脱敏要求。合规脱敏说明: 姓名、出生年月、住址和身份证号四项信息均已进行遮掩脱敏。账号:admin、密码:admin。
【答案标准】例:若合规脱敏照片数量为100,则最终提交答案为:100。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import osfrom collections import Counterimport numpy as npfrom PIL import Imagedef get_region_average_color (image_path, box ): with Image.open (image_path) as img: img = img.convert('RGB' ) region = img.crop(box) arr = np.array(region, dtype=np.float32) return arr.mean(axis=(0 , 1 )).mean() img_dir = "身份证脱敏附件" datas = {}for i in os.listdir(img_dir): datas[i] = 0 box = [(79 , 49 , 119 , 66 ), (84 , 127 , 116 , 139 ), (85 , 164 , 253 , 177 ), (222 , 256 , 334 , 271 )] image_path = os.path.join(img_dir, i) for b in box: avg_color = (get_region_average_color(image_path, b)) if avg_color < 1 : datas[i] = datas[i] + 1 count = Counter(list (datas.values()))print ("合规脱敏照片" , count[4 ])
题目名称:脱敏分级
统计过合规脱敏数量后,需要对存在风险脱敏照片进行分级,严重风险(全未脱敏)、高风险(仅脱敏一项)、中风险(仅脱敏两项)、低风险(仅脱敏三项)。
【答案标准】例:若严重风险10个、高风险8个、中风险20个、低风险40个,则最终提交答案为:10-8-20-40。
同上
1 2 print ("风险分级" , f"{count[0 ]} -{count[1 ]} -{count[2 ]} -{count[3 ]} " )
场景四:数据治理 题目名称:数据清洗
在智能停车场管理平台的财务审核过程中,发现部分车辆收费金额可能异常,请根据平台后台停车记录,按照停车场收费规则重新计算费用(不足30分钟免费,超过30分钟(含)开始计费,每30分钟1元,不足30分钟按30分钟计费,单次封顶200元),找出收费不准确的车牌号,并按车牌省份的拼音首字母从a-z排序。
【答案标准】例:若存在异常的车牌号为:京X0CRPB、粤DB3LPP、京BXGKFE,则最终提交答案为:京BXGKFE-京X0CRPB-粤DB3LPP。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from dateutil import parserimport requests headers = { "host" : "192.168.70.101:8080" , "cookie" : "JSESSIONID=483a67bd-d30d-4812-beda-566808968c76; oFLm_2132_saltkey=fG2PzGA2; oFLm_2132_lastvisit=1762325807; PHPSESSID=av60vi0ujr96i8j8qja8kk8jvu; session=eyJ1c2VyX2lkIjoyfQ.aQsJNg.AWQc80KApt-1aa2y72W5-L8_hig; oFLm_2132_sid=Zx2hvE; oFLm_2132_ulastactivity=be5fWwd2Dn75yM_-Xfy6TEbvm2ANBmd3YwKe06vjAXbtzL0zD9WQ; oFLm_2132_auth=ed8e135YOi2Bk61D7d1nCVGYZ7zptn68ta4jRb86E6Ic6CuxXhASi3Lk-LrSxcnMj11D4-ZehnCaX-y5YfLv; oFLm_2132_logintime=1762331088; oFLm_2132_lip=192.168.2.230%2C1762269573; oFLm_2132_checkauthlic=1; oFLm_2132_checkupgrade=1; oFLm_2132_lastact=1762332018%09index.php%09banner" } data = {"pageSize" : 2000 , "pageNo" : 1 } r = requests.post("https://192.168.70.101:8080/car/parkingRecord/list" , data=data, verify=False , headers=headers)''' ( 不足30分钟免费, 超过30分钟(含)开始计费, 每30分钟1元, 不足30分钟按30分 钟计费,单次封顶200元 ) ''' def calculate_fare (minutes ): if minutes < 30 : return 0 else : if minutes % 30 == 0 : return min (minutes // 30 , 200 ) else : return min (minutes // 30 + 1 , 200 ) xx= [] x = 0 for i in r.json()['msg' ]['pageData' ]: gmtInto = parser.parse(i['gmtInto' ]) gmtOut = parser.parse(i['gmtOut' ]) c = int (((gmtOut - gmtInto).total_seconds()) / 60 ) cx = calculate_fare(c) cost = i['cost' ] plan = i['plateNumber' ] if cx != int (float (cost)): x += 1 print (plan) xx.append(plan)print (xx)'沪IM2JHT-京ZSXANF-苏UH30RR-浙I72RNW'
题目名称:数据脱敏
公司在对外提供智能停车场管理平台订单数据时,为了避免敏感信息泄露,系统会对订单中的关键字段进行脱敏处理。审查人员只需在不暴露具体细节的情况下完成业务审核,请导出后台订单管理-订单列表xls表,并按照下面规则进行脱敏和提交。
脱敏规则
订单号仅保留前三位和后三位,中间全部使用 * 替换。
示例: 202508290001 → 202******001
车牌号保留前半部分,后三位使用 * 替换。
示例: 粤B12345 → 粤B12***
订单金额一位数金额:全部脱敏8 → *
两位数金额:保留首位,末位脱敏58 → 5*
三位及以上:仅保留首位和末位,中间全部使用 * 替换1234 → 1**4
提交方式**
将脱敏后的数据导出为csv格式,修改编码为UTF-8,换行为:CRLF后加密成md5提交。
【答案标准】例:若脱敏后的csv文件加密成md5为:ddc80d31ca1d43b7b97d7f6bd6e6b85c,则最终提交答案
为:ddc80d31ca1d43b7b97d7f6bd6e6b85c。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import pandas as pd df = pd.read_csv("订单信息-20251105083448.csv" , encoding="utf-8" ,dtype=str ) x = []for index, row in df.iterrows(): id = row['ID' ] num = str (row[1 ]) num = num[:3 ] + "*" * 6 + num[-3 :] card = row[2 ][:4 ] + "*" * 3 cost = str (int (float (row[3 ]))) nc = '' if len (cost) == 1 : nc = '*' elif len (cost) == 2 : nc = cost[0 ] + "*" elif len (cost) >= 3 : nc = cost[0 ] + "*" *(len (cost)-2 ) + cost[-1 ] else : print ("error" ) print (num, card, nc) x.append([id , num, card, nc]) df = pd.DataFrame(x, columns=['ID' , '订单号' , '车牌号' , '金额' ]) df.to_csv("output.csv" , index=False , encoding="utf-8" )
场景五:数据销毁 题目名称:数据销毁
智能停车场管理平台存储了大量业主用户数据,现需检查用户是否存在弱口令,并验证昵称、邮箱和手机号是否充分脱敏。若用户口令出现在提供的字典中,则判定为弱口令风险;若用户的昵称、邮箱或手机号有任何未脱敏的字段,则判定为脱敏风险,后续将销毁存在风险的用户。字典需使用题目提供的附件:字典.txt。
【答案标准】例:若弱口令风险用户数量为100、脱敏风险用户数量为20,则最终提交答案为:100-20。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import hashlibfrom dateutil import parserimport requests headers = { "host" : "192.168.70.101:8080" , "cookie" : "oFLm_2132_saltkey=fG2PzGA2; oFLm_2132_lastvisit=1762325807; PHPSESSID=av60vi0ujr96i8j8qja8kk8jvu; session=eyJ1c2VyX2lkIjoyfQ.aQsJNg.AWQc80KApt-1aa2y72W5-L8_hig; oFLm_2132_sid=Zx2hvE; oFLm_2132_ulastactivity=be5fWwd2Dn75yM_-Xfy6TEbvm2ANBmd3YwKe06vjAXbtzL0zD9WQ; oFLm_2132_auth=ed8e135YOi2Bk61D7d1nCVGYZ7zptn68ta4jRb86E6Ic6CuxXhASi3Lk-LrSxcnMj11D4-ZehnCaX-y5YfLv; oFLm_2132_logintime=1762331088; oFLm_2132_lip=192.168.2.230%2C1762269573; oFLm_2132_checkauthlic=1; oFLm_2132_checkupgrade=1; oFLm_2132_lastact=1762332018%09index.php%09banner; JSESSIONID=89e09ddd-9be1-4538-9a3e-0394b81e4f14" } data = {"pageSize" : 1005 , "pageNo" : 1 } r = requests.post("https://192.168.70.101:8080/sys/user/list" , data=data, verify=False , headers=headers) kl = []with open (r"C:\Users\jack8\Desktop\综合场景\字典.txt" , 'r' ) as f: for line in f: p = line.strip() m = hashlib.md5(p.encode()).hexdigest() kl.append(m)print (kl) rkl = 0 wtm = 0 for i in r.json()['msg' ]['pageData' ]: nickname = i['nickname' ] email = i['email' ] password = i['password' ] mobile = str (i['mobile' ]) if password in kl: rkl += 1 print (nickname, email, password, mobile) if "*" not in email or "*" not in nickname or "*" not in mobile: wtm += 1 print (rkl) print (wtm)
场景六:模型安全 题目名称:成员推断
分析社区言论审核系统的模型响应特征,设计并实施成员推断攻击。根据平台提供的用户名列表(usernames.csv),构造查询请求并分析模型的概率输出,判断列表中每一个用户名所对应的用户数据是否被用于模型训练。您需要将您的判断结果整理成指定的CSV格式文件并提交至竞赛平台进行验证,平台将根据您推断的准确性进行评分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import jsonimport pandas as pdimport requests headers = { "origin" : "http://192.168.70.101:8090" , "Content-Type" : "application/json" , "cookie" : "oFLm_2132_saltkey=fG2PzGA2; oFLm_2132_lastvisit=1762325807; PHPSESSID=av60vi0ujr96i8j8qja8kk8jvu; session=eyJ1c2VyX2lkIjoyfQ.aQsJNg.AWQc80KApt-1aa2y72W5-L8_hig; oFLm_2132_sid=Zx2hvE; oFLm_2132_ulastactivity=be5fWwd2Dn75yM_-Xfy6TEbvm2ANBmd3YwKe06vjAXbtzL0zD9WQ; oFLm_2132_auth=ed8e135YOi2Bk61D7d1nCVGYZ7zptn68ta4jRb86E6Ic6CuxXhASi3Lk-LrSxcnMj11D4-ZehnCaX-y5YfLv; oFLm_2132_logintime=1762331088; oFLm_2132_lip=192.168.2.230%2C1762269573; oFLm_2132_checkauthlic=1; oFLm_2132_checkupgrade=1; oFLm_2132_lastact=1762332018%09index.php%09banner; JSESSIONID=89e09ddd-9be1-4538-9a3e-0394b81e4f14" , "referer" : "http://192.168.70.101:8090/" , } df = pd.read_csv(r"C:\Users\jack8\Desktop\综合场景\成员推断\usernames.csv" )for index, row in df.iterrows(): username = row['username' ] data1 = {"text" : f"{username} " } r = requests.post("http://192.168.70.101:8090/predict" , data=json.dumps(data1), verify=False , headers=headers) print (r.json()['probability' ])
获取预测分数后,按分数判断成员
题目名称:对抗样本
人脸对抗样本:你的任务是作为AI安全人员,针对强盛物业的门禁系统进行安全测试。该系统提供/model_download接口供选手下载人脸识别模型文件。选手需基于附件提供的face_features.json(样例人脸特征向量)、face_features_convert.py(特征转换脚本)及face.jpg(转换后的样例图像),分析模型结构及其API返回信息,设计并生成对抗样本图像,使系统误识别为合法用户,从而登录系统后台,获取答案字符串。
pass
人工智能模型数据安全挑战赛 场景说明
本次人工智能模型数据安全挑战赛围绕金融场景下的四大核心系统展开:反欺诈、贷款审批、资产识别与智能客服。参赛者需依次完成数据清洗与标注、欺诈模型训练、逻辑回归模型参数窃取、对抗样本生成、人脸特征伪造及提示词注入攻击等任务。赛题涵盖数据安全、模型鲁棒性、隐私保护与AI越权访问等关键风险点,全面考察选手在AI安全攻防、数据处理与模型逆向工程等方面的综合能力,以提升对AI系统潜在安全威胁的认知与防御水平。
一、模型预训练 你是 “智慧金融 ” 公司新成立的 AI 安全部门的一名核心数据科学家。你的首个任务,就是参与开发 AI 反欺诈系统 ——“ 鹰眼计划 ”。鹰眼系统在通过分析实时交易流水,精准识别并拦截潜在的欺诈行为。整个计划分为三个关键阶段: 数据清洗 、数据标注 和模型训练 。你将亲手处理原始数据,并构建出 “ 鹰眼 ” 系统的第一个原型机。
【考题 1】数据清洗
对原始交易数据 raw_transactions.csv。请根据以下规则进行数据清洗:
处理无效交易金额 :amount 列存在空值(表示交易失败)和负数(可能表示退款)。在欺诈模型中,我们只关心正向流水的绝对值。
将所有负数 金额取其绝对值 (转换为正数)。
将所有空值 金额用 0 进行填充。
补全商户类别 : merchant_category 列(商户类别)有一些记录缺失了。为了数据完整性,请将所有缺失值统一填充为字符串 UNKNOWN。
剔除无效记录 : user_id 是用户唯一标识,必须存在。请删除 所有 user_id 为空的行。
【场景附件】
raw_transactions.csv(原始交易数据)
【答案标准】 将清洗过后的文件保存为 cleaned_transactions.csv ,并提交至平台验证后得分。 (提交文件要完全符合题干要求,否则不得分)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import pandas as pd raw_file = 'raw_transactions.csv' df = pd.read_csv(raw_file)print ("原始数据形状:" , df.shape)print ("初始缺失值统计:" )print (df.isnull().sum ()) df['amount' ] = df['amount' ].abs () df['amount' ] = df['amount' ].fillna(0 )print ("处理后 amount 列:无负数,空值已填充为 0" ) df['merchant_category' ] = df['merchant_category' ].fillna('UNKNOWN' )print ("处理后 merchant_category:缺失值已填充为 'UNKNOWN'" ) initial_count = len (df) df.dropna(subset=['user_id' ], inplace=True ) cleaned_count = len (df)print (f"删除了 {initial_count - cleaned_count} 条 user_id 为空的记录。" ) output_file = 'cleaned_transactions1.csv' df.to_csv(output_file, index=False )print (f"清洗完成!已保存至:{output_file} " )
【考题 2】数据标注
请在你上一题生成的 cleaned_transactions.csv 文件基础上,新增一列名为 is_suspicious 的标签列。根据以下规则,判断每一笔交易是否可疑:
可疑交易规则(满足以下任一条件, is_suspicious 即为 1 ,否则为 0 ):
特定时段大额交易 : 交易时间( transaction_time )的小时部分( hour )在 0点到 5点之间 (包含 0和 5 ),并且交易金额( amount )大于 10,000 。
高风险商户类别 : 商户类别( merchant_category )为 GAMBLING( 赌博 ) 或 JEWELRY ( 珠宝 ) 。
跨境高危地区交易 : 交易发生的商户所在国家( merchant_country )是 MYA ( 缅甸 ), PH( 菲律宾 ), 或 KH( 柬埔寨 )。
【答案标准】 将标注过后的文件保存为 labeled_transactions.csv ,并提交至平台验证后得分。 (提交文件要完全符合题干要求,否则不得分)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import pandas as pd input_file = 'cleaned_transactions.csv' df = pd.read_csv(input_file)print ("原始数据形状:" , df.shape)print ("\n前几行预览:" )print (df.head())if not pd.api.types.is_datetime64_any_dtype(df['transaction_time' ]): df['transaction_time' ] = pd.to_datetime(df['transaction_time' ], errors='coerce' ) df['hour' ] = df['transaction_time' ].dt.hour df['is_suspicious' ] = 0 rule1_mask = (df['hour' ].between(0 , 5 , inclusive='both' )) & (df['amount' ] > 10000 ) df.loc[rule1_mask, 'is_suspicious' ] = 1 rule2_mask = df['merchant_category' ].isin(['GAMBLING' , 'JEWELRY' ]) df.loc[rule2_mask, 'is_suspicious' ] = 1 rule3_mask = df['merchant_country' ].isin(['MYA' , 'PH' , 'KH' ]) df.loc[rule3_mask, 'is_suspicious' ] = 1 output_file = 'labeled_transactions.csv' df = df.drop(columns=['hour' ]) df.to_csv(output_file, index=False )print (f"\n✅ 标签处理完成!" )print (f"总交易数:{len (df)} " )print (f"可疑交易数量:{df['is_suspicious' ].sum ()} " )print (f"已保存至:{output_file} " )
【考题 3】模型训练
请利用上一题生成的 labeled_transactions.csv文件。访问 “ 鹰眼 ” 模型训练系统,通过上传数据和调整参数,训练一个高性能的欺诈检测模型。最后,将生成的模型文件导出并提交,平台将根据其预测效果自动评分,决定你的最终成绩。
【答案标准】 将导出output.opkl(文件名必须相同 ) 模型文件压缩成 output.opkl.zip提交至平台验证后得分。 (提交的压缩包内的文件名必须为: output.opkl,最终得分:模型的 F1 分数 * 题目总分)
二、模型逆向分析 “智慧金融 ” 公司推出了一套 AI 在线贷款审批系统 ,其核心模型已通过黑盒 API 对外提供服务。 为保护商业机密,该 API 仅返回客户的 “ 贷款通过概率 ” 。然而,该接口存在被逆向的风险。一旦模型的内部权重参数被窃取,攻击者不仅能复制我们的核心业务,还能精心构造欺诈申请骗取贷款,对公司造成毁灭性打击。作为公司的安全顾问,你的任务是模拟攻击者,在仅能调用 API 的条件下,尝试窃取模型参数,从而全面评估我们这套核心系统面临的安全风险。
【题目 4】 权重窃取
选手需要分析 WEB 平台的预测接口,根据我们已知的模型输入特征,构造特定的 API 请求,反向推导出其内部逻辑回归( Logistic Regression )模型的全部 5个权重和 1 个偏置。 【题目附件】
submit1.csv(题目4提交模板)
第一列为 Parameter
Bias: 偏置项
Weight_annual_income: 年收入(元)权重
Weight_age: 年龄权重
Weight_years_of_employment: 工作年限权重
Weight_num_credit_cards: 现有信用卡数量权重
Weight_debt_to_income_ratio: 债务收入权重
第二列为 Value (填写符合题干要求的参数值)
【解题提示】 后台模型为标准的逻辑回归算法 。其核心计算过程如下:
线性求和 :首先,模型将所有输入特征( x)与其对应的权重( w)相乘,然后加上偏置( b),得到一个中间值 z。
z = (w1*x1 + w2*x2 + ... + w9*x9) + b
Sigmoid函数:然后,模型使用 Sigmoid 函数将 z 转换为一个 0 到 1 之间的概率 p。这个 p 就是系统返回给你的概率值。
【答案标准】 将您窃取到的所有参数整合进一个 submit1.csv文件提交至平台。 (偏置得分占比 25% ,其余 4 个参数各占比 15% )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import numpy as npdef sigmoid (x ): """Sigmoid函数""" return 1 / (1 + np.exp(-np.clip(x, -250 , 250 )))def inverse_sigmoid (p ): """Sigmoid反函数""" p = np.clip(p, 1e-10 , 1 - 1e-10 ) return np.log(p / (1 - p))def normalize_vector (v ): """向量归一化""" norm = np.linalg.norm(v) return v / norm if norm > 0 else vdef calculate_similarity (v1, v2 ): """计算向量相似度""" return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-10 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 import numpy as npimport matplotlib.pyplot as pltimport requestsimport jsonimport timefrom utils import sigmoid, inverse_sigmoidclass WeightStealingAttacker : """权重窃取攻击客户端 - 支持结构化输入参数(如 annual_income, age 等)""" def __init__ (self, api_url ): self.api_url = api_url self.feature_names = None self.stolen_weights = None self.stolen_bias = None self.query_count = 0 self.attack_history = [] self._fitted = False def _get_feature_names (self ): """自动探测支持的特征名称(通过一次测试请求)""" test_data = { "annual_income" : 100 , "age" : 18 , "years_of_employment" : 10 , "num_credit_cards" : 1 , "debt_to_income_ratio" : 0.5 } try : response = requests.post( f"{self.api_url} /predict" , json=test_data, headers={ 'Accept' : '*/*' , 'Accept-Language' : 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6' , 'Connection' : 'keep-alive' , 'Content-Type' : 'application/json' , 'Origin' : 'http://192.168.70.211:19102' , 'Referer' : 'http://192.168.70.211:19102/' , 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0' , }, verify=False , timeout=30 ) if response.status_code == 200 : result = response.json() self.feature_names = list (test_data.keys()) print (f"✅ 成功探测到特征字段: {self.feature_names} " ) return True else : print (f"❌ 探测失败,状态码: {response.status_code} " ) return False except Exception as e: print (f"❌ 请求异常: {e} " ) return False def _dict_to_vector (self, data_dict ): """将输入字典转换为特征向量(按 feature_names 顺序)""" if not self.feature_names: raise ValueError("未初始化特征列表,请先调用 get_feature_names()" ) vector = [] for name in self.feature_names: value = data_dict.get(name, 0.0 ) vector.append(float (value)) return np.array(vector) def query_api (self, features ): """查询受害者API - 支持 dict 或 list / np.ndarray""" if isinstance (features, dict ): input_data = features elif hasattr (features, '__iter__' ): if len (features) != len (self.feature_names): raise ValueError(f"输入长度 ({len (features)} ) 与特征数量 {len (self.feature_names)} 不匹配" ) input_data = dict (zip (self.feature_names, features)) else : raise TypeError("输入必须为字典、列表或 numpy 数组" ) try : input_data["annual_income" ] = int (input_data["annual_income" ]) input_data["years_of_employment" ] = int (input_data["years_of_employment" ]) input_data["debt_to_income_ratio" ] = round (input_data["debt_to_income_ratio" ],1 ) print ("input_data" ,input_data) response = requests.post( f"http://192.168.70.211:19102/predict" , json=input_data, headers={ 'Accept' : '*/*' , 'Accept-Language' : 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6' , 'Connection' : 'keep-alive' , 'Content-Type' : 'application/json' , 'Origin' : 'http://192.168.70.211:19102' , 'Referer' : 'http://192.168.70.211:19102/' , 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0' , }, verify=False , timeout=30 ) if response.status_code == 200 : result = response.json() prob = result.get("probability" ) if prob is None : print (f"⚠️ 返回中无 'probability' 字段: {result} " ) return None self.query_count += 1 self.attack_history.append({ "input" : input_data.copy(), "output" : float (prob), "query_number" : self.query_count }) return prob else : print (f"API请求失败: {response.status_code} , 响应内容: {response.json()} " ) return None except Exception as e: print (f"❌ 请求异常: {e} " ) return None def steal_weights_basic (self ): """基础权重窃取攻击(按单个特征扰动)""" if not self.feature_names: print ("⚠️ 未获取到特征名,正在尝试探测..." ) if not self._get_feature_names(): print ("❌ 探测失败,无法进行攻击" ) return None , None start_time = time.time() zero_data = {name: 0.0 for name in self.feature_names} prob_zero = self.query_api(zero_data) if prob_zero is None : print ("❌ 零向量查询失败,无法估计偏置" ) return None , None estimated_bias = inverse_sigmoid(prob_zero) print (f"✅ 估计偏置: {estimated_bias:.4 f} , 概率={prob_zero:.6 f} " ) stolen_weights = np.zeros(len (self.feature_names)) print ("🔍 正在估计各特征权重..." ) for i, name in enumerate (self.feature_names): perturbed_data = {n: (1.0 if n == name else 0.0 ) for n in self.feature_names} prob = self.query_api(perturbed_data) if prob is None : print (f"❌ 特征 '{name} ' 查询失败,跳过" ) continue weight_estimate = inverse_sigmoid(prob) - estimated_bias stolen_weights[i] = weight_estimate print (f" [{i + 1 } /{len (self.feature_names)} ] {name} : p={prob:.6 f} → w={weight_estimate:.4 f} " ) time.sleep(0.2 ) self.stolen_weights = stolen_weights self.stolen_bias = estimated_bias elapsed_time = time.time() - start_time print (f"✅ 基础攻击完成!耗时: {elapsed_time:.2 f} s, 查询次数: {self.query_count} " ) for name, w in zip (self.feature_names, stolen_weights): print (f" {name} : {w:.4 f} " ) return stolen_weights, estimated_bias def steal_weights_advanced (self, n_queries=300 ): """高级攻击:使用随机采样 + 线性拟合""" if not self.feature_names: print ("⚠️ 未获取特征名,正在探测..." ) if not self._get_feature_names(): return None , None start_time = time.time() X_queries = [] probs = [] print (f"🚀 开始高级攻击:生成 {n_queries} 次随机查询..." ) for i in range (n_queries): data = { "annual_income" : np.random.uniform(20 , 150 ), "age" : np.random.randint(18 , 70 ), "years_of_employment" : np.random.uniform(0 , 40 ), "num_credit_cards" : np.random.randint(0 , 10 ), "debt_to_income_ratio" : np.random.uniform(0.0 , 1.0 ) } prob = self.query_api(data) if prob is not None : x_vec = self._dict_to_vector(data) X_queries.append(x_vec) probs.append(prob) if (i + 1 ) % 50 == 0 : print (f" 进度: {i + 1 } /{n_queries} , 成功查询: {len (probs)} " ) time.sleep(0.07 ) if len (X_queries) < 10 : print ("❌ 有效查询不足,无法拟合" ) return None , None X_queries = np.array(X_queries) probs = np.array(probs) log_odds = inverse_sigmoid(probs) X_augmented = np.hstack([X_queries, np.ones((len (X_queries), 1 ))]) try : coeffs, residuals, rank, s = np.linalg.lstsq(X_augmented, log_odds, rcond=None ) stolen_weights = coeffs[:-1 ] estimated_bias = coeffs[-1 ] self.stolen_weights = stolen_weights self.stolen_bias = estimated_bias elapsed_time = time.time() - start_time print (f"✅ 高级攻击完成!耗时: {elapsed_time:.2 f} s, 有效查询: {len (X_queries)} " ) for name, w in zip (self.feature_names, stolen_weights): print (f" {name} : {w:.4 f} " ) except Exception as e: print (f"❌ 拟合失败: {e} " ) return None , None return stolen_weights, estimated_bias def evaluate_attack (self, true_weights=None , true_bias=None ): """评估攻击效果""" if self.stolen_weights is None : print ("⚠️ 未执行攻击,请先调用 steal_weights_* 方法" ) return None , None if true_weights is None or true_bias is None : print ("⚠️ 无法获取真实模型参数,仅展示窃取结果" ) weight_error = np.linalg.norm(self.stolen_weights) weight_cosine = 1.0 else : try : weight_error = np.linalg.norm(self.stolen_weights - true_weights) weight_cosine = np.dot(self.stolen_weights, true_weights) / ( np.linalg.norm(self.stolen_weights) * np.linalg.norm(true_weights) + 1e-10 ) except Exception as e: print (f"评估计算出错: {e} " ) return None , None print ("\n📊 攻击效果总结:" ) print (f" 总查询次数: {self.query_count} " ) print (f" 权重误差 (L2): {weight_error:.4 f} " ) print (f" 余弦相似度: {weight_cosine:.4 f} " ) return weight_error, weight_cosinedef main (): """主函数""" print ("=" * 60 ) print ("🎯 权重窃取攻击客户端 - 支持多参数输入" ) print ("=" * 60 ) API_URL = "http://192.168.70.211:19102" attacker = WeightStealingAttacker(API_URL) try : print ("🔍 正在测试API连接..." ) test_data = { "annual_income" : 100 , "age" : 18 , "years_of_employment" : 10 , "num_credit_cards" : 1 , "debt_to_income_ratio" : 0.5 } prob_test = attacker.query_api(test_data) if prob_test is None : print ("❌ 无法连接到目标API,请检查服务是否运行!" ) return print (f"✅ API连接成功,测试返回概率: {prob_test:.6 f} " ) print ("\n🔍 正在获取真实权重信息..." ) try : response = requests.get(f"{API_URL} /model_info" , timeout=10 ) if response.status_code == 200 : model_info = response.json() true_weights = np.array(model_info['true_weights' ]) true_bias = model_info['true_bias' ] print ("✅ 获取真实权重成功" ) else : print (f"⚠️ 无法获取模型信息 (状态码: {response.status_code} ),跳过精确评估" ) true_weights = true_bias = None except Exception as e: print (f"⚠️ 获取真实参数失败: {e} ,将仅展示攻击结果" ) true_weights = true_bias = None print ("\n🔢 攻击模式选择:" ) print ("1. 基础权重窃取(基于单位向量 + 零向量)" ) print ("2. 高级权重窃取(随机采样+最小二乘拟合,推荐)" ) choice = input ("请选择攻击方式 (1/2): " ).strip() if choice not in ["1" , "2" ]: print ("❌ 无效选择,默认使用高级模式..." ) choice = "2" if choice == "1" : stolen_weights, stolen_bias = attacker.steal_weights_basic() else : stolen_weights, stolen_bias = attacker.steal_weights_advanced(n_queries=500 ) if stolen_weights is None or len (stolen_weights) == 0 : print ("❌ 权重窃取失败,请检查网络或API行为。" ) return print (f"\n✅ 攻击完成!窃取模型参数如下:" ) feature_names = ["annual_income" , "age" , "years_of_employment" , "num_credit_cards" , "debt_to_income_ratio" ] for i, name in enumerate (feature_names): if i < len (stolen_weights): print (f" {name} : {stolen_weights[i]:.6 f} " ) print (f" bias (偏置): {stolen_bias:.6 f} " ) if true_weights is not None : attacker.evaluate_attack(true_weights, true_bias) else : print ("⚠️ 无真实模型参数,无法精确评估。" ) try : stats_response = requests.get(f"{API_URL} /stats" , timeout=10 ) if stats_response.status_code == 200 : stats = stats_response.json() print ("\n📊 API统计信息:" ) print (f" 总查询次数: {stats['total_queries' ]} " ) print (f" 可疑查询次数: {stats['suspicious_queries' ]} " ) print (f" 防御模式: {stats['defense_mode' ]} " ) else : print ("⚠️ 无法获取统计信息" ) except Exception as e: print (f"⚠️ 获取统计失败: {e} " ) except Exception as e: print (f"❌ 主程序异常: {e} " )if __name__ == "__main__" : main()
【题目 5】概率攻击
选手精心构造一组全新的、合理的贷款申请参数。当你将这组参数提交给预测系统时,系统返回的预测概率要精确等于 :0.9956410313816209【题目附件】 submit2.csv(题目 5提交模板)
第一列为 Parameter
annual_income: 年收入(元)
age: 年龄
years_of_employment: 工作年限
num_credit_cards: 现有信用卡数量
debt_to_income_ratio: 债务收入比
第二列为 Value (填写符合题干要求的参数值)
【答案标准】 将相关参数整合进 submit2.csv文件提交至平台,平台将根据您提交参数的准确性进行判分。 (提交文件要完全符合题干要求,否则不得分)
爆破参数直到probability符合条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import requestsfor i in range (518000 , 550000 ): for j in range (18 , 80 ): for .......: response = requests.post( f"http://192.168.70.211:19102/predict" , json={ "annual_income" : i, "age" : j, "years_of_employment" : 10 , "num_credit_cards" : 5 , "debt_to_income_ratio" : 0.5 }, verify=False , timeout=10 ) print (i, response.json()['probability' ]) if response.json()['probability' ] == 0.9956410313816209 : print (i, j) break
【题目 6】篡改溯源
系统日志显示,一份关键申请人的档案在录入时发生了数据异常,导致其中一个字段被错误地修改。幸运的是,在数据损坏前,系统已经为原始的、正确的档案计算并存储了一个 “ 预测概率值 ” ,我们可以将其视为原始数据的“校验和 ” ,请你对下面这份已损坏的档案进行分析,找出哪一个字段被篡改了,并恢复其原始的、正确的数值。 【案件线索】
原始校验和 ( 正确的预测概率 ): 0.8550518770812063
已损坏的档案记录 :
1 2 3 4 5 annual_income :250000 .5 age :45 years_of_employment :40 num_credit_cards :5 debt_to_income_ratio :0 .5
【答案标准】 找到被篡改的字段名称和其恢复后的正确整数值后,将它们格式化为 字段名:数值 (英文冒号)的字符串,然后提交该字符串。 假设你发现是 age 被改了,其原始值应为 25,构造字符串 : age:25作为答案提交。
同上,遍历攻击
三、图像对抗攻击 “智慧金融 ” 公司开发了一套基于图像的资产识别系统,用于自动化识别用户上传的资产图像,以快速评估用户申请贷款时提交的抵押品(如车辆、房子等)。该系统提升人工识别效率,并有效防范利用伪造或不合格资产进行的欺诈行为。
【题目 7】图像对抗
系统提供了一个基于 ResNet50 深度学习模型的图像识别平台,该模型使用 ImageNet 数据集进行预训练,能够识别不同的物体类别。平台中有一张默认的参考图片,你的任务是上传一张经过精心设计的对抗样本图片,使其满足以下两个看似矛盾的条件:
与默认图片在视觉上高度相似(相似度≥ 95% )
被模型识别为完全不同的类别
【答案标准】 提交验证成功后得到的字符串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 """ The spatial attack is a very special attack because it tries to find adversarial perturbations using a set of translations and rotations rather then in an Lp ball. It therefore has a slightly different interface. """ import torchimport torchvision.models as modelsimport eagerpy as epfrom PIL import Imagefrom foolbox import PyTorchModel, accuracy, samples, attacks, criteriaimport foolbox.attacks as fafrom torchvision import transformsfrom torchvision.models import ResNet50_Weightsfrom torchvision.utils import save_imagedef main () -> None : DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu" ) model = models.resnet50(weights=ResNet50_Weights).eval () preprocessing = dict (mean=[0.485 , 0.456 , 0.406 ], std=[0.229 , 0.224 , 0.225 ], axis=-3 ) fmodel = PyTorchModel(model, bounds=(0 , 1 ), preprocessing=preprocessing) image_path = "default.jpg" preprocess = transforms.Compose([ transforms.ToTensor(), ]) image = Image.open (image_path).convert("RGB" ) image_tensor = preprocess(image).unsqueeze(0 ) image_tensor = image_tensor.to(DEVICE) labels_tensor = torch.tensor([404 ], device=DEVICE) images, labels = ep.astensors(image_tensor, labels_tensor) clean_acc = accuracy(fmodel, images, labels) * 100 print (f"clean accuracy: {clean_acc:.1 f} %" ) attack_target = 1 target_class = torch.tensor([attack_target]).to(DEVICE) attack = attacks.LinfPGD( abs_stepsize=0.05 / 255.0 , ) adversarials, aaa, success = attack( fmodel, image_tensor, labels_tensor, epsilons=0.01 ) with torch.no_grad(): preds = fmodel(adversarials).argmax(axis=1 ) print ("预测类别:" , preds.item(), "是否成功攻击到目标类:" , preds.item() == target_class.item()) adv_224 = adversarials.detach().clamp(0.0 , 1.0 ) save_image(adv_224, "adversarial_pgd1111.jpg" ) image_path = "adversarial_pgd1111.jpg" preprocess = transforms.Compose([ transforms.ToTensor(), ]) image = Image.open (image_path).convert("RGB" ) image_tensor = preprocess(image).unsqueeze(0 ) image_tensor = image_tensor.to(DEVICE) with torch.no_grad(): logits = model(image_tensor).squeeze(0 ) logits_float = logits.float () pred_idx = torch.argmax(logits_float).item() print (pred_idx)if __name__ == "__main__" : main()
本地执行成功,线上(黑盒?)环境攻击失败
【题目 8】人脸伪造
资产图像识别系统的后台采用基于逻辑回归的人脸识别登录系统,该系统通过提取人脸特征向量,使用逻辑回归模型进行二分类判断,以确定用户身份是否匹配。
利用逻辑回归算法的线性特性和参数可预测性,采用黑盒攻击方式提取出完整的模型参数包括权重矩阵 W 和偏置向量 b,然后基于这些参数逆向计算并构造出能够通过身份验证的管理员人脸特征向量,使其在逻辑回归模型中产生正确的分类结果从而成功绕过身份验证系统获得管理员权限。
【平台附件】
【答案标准】 提交登录后台成功显示的字符串。
pass
四、大模型攻击 “智慧金融” 公司为了提升客户服务体验与运营效率,正式上线了一套全新智能客服系统。该系统基于 AI 大语言模型技术,能够实时解决用户的各类问题。无论是常规的账户查询、业务办理指引,还是复杂的理财产品咨询和个性化投资建议,都能精准理解用户的自然语言提问,并迅速提供专业、清晰的解答。
【题目 9】模型越狱
智慧金融公司新上线的 AI 智能客服为其集成的用户信息查询模块( MCP )设定了严格的权限 :仅能访问当前认证用户的个人数据。你的任务是,在登录自己(王浩)账户的情况下,构建一段巧妙的恶意提示词,绕过这层安全边界,诱使 AI 违规查询并泄露出另一指定用户 ‘陈静( id:2 ) ’的完整个人信息。
【答案标准】 成功获取陈静的身份证号后,将其作为最终答案提交至平台验证得分。
pass
【题目 10 】提示词窃取
由于一次部署过程中的配置疏忽, AI 智能客服的系统提示词中,被意外地硬编码了一段包含后台系统密码的调试信息。你的任务是进行一次精妙的提示词逆向工程,诱导 AI 完整复述出其自身的系统提示词。请注意,系统部署了关键词过滤功能,能有效拦截 AI 直接回复敏感信息等内容。你需要构建出一段出其不意的提示词,绕过这层防御,并从泄露的内容中成功获取系统密码 ,将其作为最终答案提交。
【答案标准】 成功获取系统密码后,将其作为最终答案提交至平台验证得分。
pass
题目附件 1 2 3 通过网盘分享的文件:全国总工会第二届职工数字化应用技术技能大赛附件.zip 链接: https://pan.baidu.com/s/11y0Skb5TATM61PMB5itwhg ?pwd =da5a 提取码: da5a --来自百度网盘超级会员v9的分享