工具准备 流量分析工具 Wireshark :流量分析的瑞士军刀,功能强大但界面略显简陋。EasyTshark :网上大佬对 Tshark 的二次开发,功能精简(很有限),界面美观(Web 做的能不美观吗),上手容易,但 BUG 有亿点多 。适合当一个玩具。Omnipeek :没用过,似乎要付费,官网连个下载按钮都找不到。有 Wireshark 就够了。不过我后面的演示有用到 EasyTshark,所以提一嘴。
组件 Packet capture (and sending) library for Windows.
WinPcap :它的最后一次更新是在 2013.03.08… 应该被淘汰了。Npcap :比 WinPcap 强大。若安装 Wireshark,会附带安装 Npcap。二选一安装即可。
其它 可选,一些可能有用的辅助工具。
实验过程 1. 抓包 先侦听流量并保存为 .pcapng 文件,后面再分析。
启动抓包软件,选择 Interface,软件会记录所有经过该网卡的数据包。然后停止并保存即可。
由于实验要求分析 FTP 和 HTTP 流量,所以需要人为地发起这 2 种协议的请求。
由于我的 2 个抓包软件都抓不到本地回环的流量 ,所以直接在 localhost 起服务器可能不太可行,我在实操中用了一些曲线救国的方法。
FTP 懒得搭建和配置 FTP 服务器了,借用一下学校交作业用的 FTP 服务器。反正只是抓一下登录的流量,没做什么破坏性的事。
我习惯用 Filezilla Client 来连接 FTP Server,直接用 Windows 的资源管理器应该也可以。
详见后文 。
HTTP 可以在网上找找有没有 HTTP 服务器。我懒得找,所以自己搭了一个。
Python 自带 HTTP 服务器,终端使用命令 python -m http.server [port] 即可在指定端口开启 HTTP 服务,默认端口是 8000。
前面说了,我的抓包软件抓不到本地回环的流量,所以我是在虚拟机里起的服务器,然后监听虚拟网卡,捕获向虚拟机的 HTTP 服务器发起的请求。
SMTP 虚拟机起一个 SMTP 服务器。
Python 在 3.11 版本及之前,内置了一个极简的 SMTP 调试服务器:smtpd,可通过命令 python -m smtpd -c DebuggingServer -n localhost:11451 启动。
对于 3.12 及更新的版本,smtpd 已被移除,可以用三方库 aiosmtpd 做替代。用包管理器安装后,使用命令 python -m aiosmtpd -n -l 0.0.0.0:11451 启动。
回到主机,利用内置的 smtplib 库发送一封简单的邮件:
import smtplibfrom email.message import EmailMessagemsg = EmailMessage() msg["From" ] = "qwq@smtp-test.com" msg["To" ] = "awa@smtp-test.com" msg["Subject" ] = "Hello world" msg.set_content("This is the letter content. I'm evil, nothing here." ) with smtplib.SMTP("61.139.2.133" , 11451 ) as server: server.send_message(msg)
若邮件成功发送,服务端终端会有回显:
抓包记录这个过程,会话数据流如下:
HTTPS HTTPS = HTTP + SSL / TLS,解密要用到 SSLKEY,只要设置环境变量 SSLKEYLOGFILE,重启 Chrome 后, Chrome 就会把 SSLKEY 保存到指定的文件。
注
不抓包的时候,记得删掉这个环境变量,避免资源浪费。
以 Windows 为例:
然后给 Wireshark 导入证书(编辑 -> 首选项 -> Protocols -> TLS -> (Pre)-Master-Secret log filename 选择你的 sslkeylog 文件),然后就能够解密 HTTPS 流量了:
2. 数据包分析 观察数据格式 在 Wireshark 里打开刚刚捕获的 .pcapng 文件,选择一个 TCP 包,查看它的帧信息。
各个字段的含义:
协议/封装单元 OSI 层级 说明 Frame 物理层 物理层的数据帧概况 Ethernet II 数据链路层 数据链路层以太网帧头部信息 Internet Protocol Version 4/6 网络层 互联网层 IP 包头部信息,属于网络层 Transmission Control Protocol 传输层 传输层的数据段头部信息,此处是 TCP Hypertext Transfer Protocol 应用层 应用层的信息,如 HTTP 请求/响应
以太网的帧格式:
前同步码、帧首定界符、CRC 校验位似乎并未在数据包里体现,从抓到的数据包头开始分别是 6 字节的目的地址,6 字节的源地址,2 字节的类型字段,以及 46-1500 字节的数据。
以下是一个示例:
目的地址是 40:fe:95:fe:80:01:
源地址是 04:ec:d8:c9:52:ac:
类型是 0x86dd(IPv6 数据包):
Type 之后就是 IPv4 或 IPv6 的报文头部信息(位于帧的数据区段),课上没讲,这里亦不做展开。
对于实验要求验证的内容,帧格式 & MAC地址 见上图的 Ethernet II,IP 报文格式 见上图的 Internet Protocol Version 6,TCP 段格式 见上图的 Transmission Control Protocol,这里就不把具体内容再贴一遍了。
FTP 协议命令与响应格式 FTP 协议命令格式 :<命令码> [<参数>] <CR><LF>
常见命令如下:
命令 格式 说明 USER USER <用户名><CR><LF>发送用户名 PASS PASS <密码><CR><LF>发送密码 AUTH AUTH <机制><CR><LF>认证机制,如 AUTH TLS 或 AUTH SSL,用于加密控制连接 SYST SYST<CR><LF>获取服务器操作系统类型 FEAT FEAT<CR><LF>查询服务器支持的特性扩展列表 PWD PWD<CR><LF>打印当前工作目录(Print Working Directory) TYPE TYPE <类型码><CR><LF>设置传输类型,如 TYPE A(ASCII)、TYPE I(二进制/图像) PASV PASV<CR><LF>进入被动模式,服务器返回IP和端口供客户端连接 MLSD MLSD [<路径>]<CR><LF>列出目录内容(机器可解析格式),比LIST更标准化 CWD CWD <路径><CR><LF>改变当前工作目录(Change Working Directory) RETR RETR <文件名><CR><LF>下载文件 STOR STOR <文件名><CR><LF>上传文件 PORT PORT h1,h2,h3,h4,p1,p2<CR><LF>指定数据连接端口(主动模式) LIST LIST [<路径>]<CR><LF>列出目录内容(人类可读格式) QUIT QUIT<CR><LF>断开连接
FTP 协议响应格式 :<状态码> <空格> <说明文本> <CR><LF>
具体见Wikipedia ,或者对照如下 AI 给的表格(正确性未检验):
状态码表状态码 含义 出现场景 110重新启动标记应答 REST 命令后 120服务在 nnn 分钟内准备好 服务器繁忙时 125数据连接已打开,准备传送 数据传输开始前 150文件状态良好,打开数据连接 LIST、RETR、STOR 命令后 200命令成功 TYPE、PORT、PASV 等命令成功 211系统状态或帮助响应 SYST、STAT 命令的响应 212目录状态 STAT 命令 213文件状态 STAT 命令 214帮助信息 HELP 命令 215名字系统类型 SYST 命令响应 220服务就绪 连接建立后 221服务关闭,退出登录 QUIT 命令后 225数据连接打开,无传输 数据连接已建立 226关闭数据连接,传输成功 文件传输完成 227进入被动模式 PASV 命令响应,返回 IP 和端口 229进入扩展被动模式 EPSV 命令响应 230用户登录成功 PASS 命令验证通过 234AUTH 命令成功 AUTH TLS/SSL 成功 250请求的文件操作完成 CWD、RMD、MKD 等命令成功 257路径名已创建 PWD、MKD 命令响应 331用户名正确,需要密码 USER 命令后 332需要登录账户信息 需要 ACCT 命令 350需要进一步命令 RNFR 命令后需要 RNTO 421无法提供服务,关闭连接 连接超时或服务器过载 425无法打开数据连接 端口无法绑定或连接失败 426连接关闭,传输中止 数据传输中断 450请求的文件操作未执行 文件被占用或不可访问 451操作中止:本地错误 服务器内部错误 452系统存储空间不足 写入磁盘空间不足 500语法错误,命令不可识别 未知命令 501参数语法错误 参数格式错误 502命令未实现 服务器不支持该命令 503命令顺序错误 如登录前执行需要登录的命令 504命令参数未实现 参数有效但不支持 530未登录 需要先登录认证 532需要账户信息存储文件 需要 ACCT 命令 550请求操作未执行 文件不存在或无权限 551操作中止:页类型未知 非标准页类型 552存储分配溢出 磁盘空间或配额不足 553文件名不合法 文件名包含非法字符
用过滤器过滤出 FTP 数据包:
过滤器使用示例
ftp:筛选所有 FTP 协议的数据包ftp.request.command == "AUTH":筛选 FTP 请求中命令为 AUTH 的数据包ftp.response.code == 220:筛选 FTP 特定相应状态码的数据包可参考过滤器的补全提示 一个一个包点开看显得很蠢,比较好的做法是分析会话数据流 。
选中一个数据包,如果是 Wireshark,右键 -> 追踪流 -> TCP Stream 或者按快捷键 Ctrl + Shift + Alt + T,如果是 EasyTshark,点击数据包右侧聊天气泡般的按钮即可,这里以 EasyTshark 为例。
会话时序图:
会话数据流里可看到 FTP 命令、响应、数据传输等信息:
可见,命令和相应格式与前面所说一致。
HTTP 流量分析 也是个明文协议,看看会话数据流就差不多了,没啥好分析的。
TCP 机制观察 连接建立与拆除 TCP 有三次握手与四次挥手。
三次握手:客户端 SYN,服务端 SYN + ACK,客户端 ACK。 四次挥手:主动关闭方 FIN + ACK,被动关闭方 ACK,被动关闭方 FIN + ACK,主动关闭方 ACK。 挑一个 TCP 包,查看时序图,即可观察到 TCP 的三次握手与四次挥手的过程:
序列号与确认号的变化
窗口机制与拥塞控制 这部分参考了以下两篇文章:
具体实验做法很简单,找个站点下载文件,并用 Wireshark 捕捉整个过程,分析 tcptrace 图(统计 -> TCP 流图形 -> tcptrace)。
如果得不到理想的图,可能要换几个站点,多次尝试。
我得到的 tcptrace 图如下:
这个图一共有三条线,分别代表:
绿线 :接收方通告的窗口大小,即接收方当前能接收多少数据(窗口机制的体现)。蓝线 :发送方实际发出的数据量。它是连续攀升的,每向上一步,都代表一个数据包被发出。线的斜率直接反映了当时的发送速率。黄线 :发送方已 ACK 的数据量,也就是实际收到的数据。红色竖线 :代表 SACK(Selective Acknowledgment),每条红色竖线代表一个被接收方成功收到但“不连续”的数据块,意味着出现了丢包 或数据包乱序 。从这张图就可以清晰地看到 TCP 的慢启动和拥塞避免阶段:
3. 调库侦听网络数据并记录统计 Ether/IP
实验要求
侦听网络上的数据流,解析发送方与接收方的 MAC 和 IP 地址,并按如下 CSV 格式输出日志:
时间,源MAC,源IP,目标MAC,目标IP,帧长度 2015-03-14 13:05:16,60-36-DD-7D-D5-21,192.168.33.1,60-36-DD-7D-D5-72,192.168.33.2,1536
使用 Python + Scapy 一把梭:
import csvimport timefrom collections import defaultdictfrom scapy.all import Ether, IP, sniffstats = defaultdict(int ) start_time = time.time() def process_packet (packet ): global start_time if Ether in packet and IP in packet: pkt_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime(packet.time)) src_mac = packet[Ether].src dst_mac = packet[Ether].dst src_ip = packet[IP].src dst_ip = packet[IP].dst pkt_len = len (packet) row = [pkt_time, src_mac, src_ip, dst_mac, dst_ip, pkt_len] print ("," .join(map (str , row))) with open ("traffic_log.csv" , "a" , newline = "" ) as f: writer = csv.writer(f) writer.writerow(row) stats[f"From {src_mac} ({src_ip} )" ] += pkt_len stats[f"To {dst_mac} ({dst_ip} )" ] += pkt_len current_time = time.time() if current_time - start_time >= 15 : for key, val in stats.items(): print (f"{key} : {val} Bytes" ) print ("-------------------------\n" ) stats.clear() start_time = current_time with open ("traffic_log.csv" , "w" , newline = "" ) as f: f.write("时间,源MAC,源IP,目标MAC,目标IP,帧长度\n" ) sniff(prn = process_packet, store = False )
FTP
实验要求
侦听 FTP 流量,提取用户名和口令并记录登录行为。日志格式示例如下:
时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否 ... student,software,SUCCEED ... student,software1,FAILED
和前面一样。
from scapy.all import Ether, IP, TCP, Raw, sniff, rdpcapimport timeimport csvfrom collections import defaultdictftp_sessions = {} def log_session (session, result ): user = session.get("user" , "UNKNOWN" ) passwd = session.get("pass" , "UNKNOWN" ) row = [ session.get("time" , "" ), session.get("src_mac" , "" ), session.get("src_ip" , "" ), session.get("dst_mac" , "" ), session.get("dst_ip" , "" ), user, passwd, result ] print ("," .join(map (str , row))) with open ("ftp_session.csv" , "a" , newline = "" ) as f: writer = csv.writer(f) writer.writerow(row) def process_packet (packet ): global start_time if Ether in packet and IP in packet and TCP in packet and Raw in packet: pkt_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime(int (packet.time))) src_mac = packet[Ether].src dst_mac = packet[Ether].dst src_ip = packet[IP].src dst_ip = packet[IP].dst pkt_len = len (packet) sport = packet[TCP].sport dport = packet[TCP].dport if dport != 21 and sport != 21 : return try : payload = packet[Raw].load.decode("utf-8" , errors = "ignore" ).strip() except Exception: return if dport == 21 : session_key = (src_ip, dst_ip) if payload.upper().startswith("USER " ): username = payload[5 :].strip() ftp_sessions[session_key] = { "user" : username, "pass" : "" , "src_mac" : src_mac, "dst_mac" : dst_mac, "src_ip" : src_ip, "dst_ip" : dst_ip, "time" : pkt_time } print (f" USER: {username} ({src_ip} -> {dst_ip} )" ) elif payload.upper().startswith("PASS " ): password = payload[5 :].strip() if session_key in ftp_sessions: ftp_sessions[session_key]["pass" ] = password ftp_sessions[session_key]["time" ] = pkt_time print (f" PASS: {password} ({src_ip} -> {dst_ip} )" ) elif sport == 21 : session_key = (dst_ip, src_ip) if session_key not in ftp_sessions: return session = ftp_sessions[session_key] if payload.startswith("230" ): log_session(session, "SUCCEED" ) del ftp_sessions[session_key] elif payload.startswith("530" ): log_session(session, "FAILED" ) del ftp_sessions[session_key] elif payload.startswith("331" ): print (f" User name okay, need password." ) with open ("ftp_session.csv" , "w" ) as f: f.write("时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否\n" ) sniff(prn = process_packet, filter = "tcp port 21" , store = False )
测试输出:
USER: hello (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: world (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:52:40,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,hello,world,FAILED USER: teacher (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: world (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:53:13,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,teacher,world,FAILED USER: root (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: root (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:53:24,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,root,root,FAILED USER: root (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: 123456 (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:53:29,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,root,123456,FAILED USER: admin (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: admin123 (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:53:43,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,admin,admin123,FAILED USER: admin (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: admin (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:53:50,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,admin,admin,FAILED USER: student (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: 123456 (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:54:04,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,student,123456,FAILED USER: student (10.30.38.117 -> 121.192.180.236) User name okay, need password. PASS: Iwonttellu (10.30.38.117 -> 121.192.180.236) 2026-04-03 14:54:17,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,student,Iwonttellu,SUCCEED
HTTP & HTTPS
实验要求
侦听并观察 HTTP 与 HTTPS(在可控情况下、指定证书或单向加密场景)数据,分析访问特征。可在本地安装 LNMP(如 HUSTOJ)以生成测试流量,解析协议内容并记录与统计用户访问行为。程序在日志中应记录下列格式(CSV)示例:
时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否 ... student,software,SUCCEED ... student,software1,FAILED
实验建议装一个 HUSTOJ 来测试,那就尝试部署看看。
(其实 vibe 一个 HTTP/HTTPS Server 也不困难,这里尊重一下实验要求。)
若直接使用 install 脚本,会给我的虚拟机装一堆依赖。由于我有点洁癖,所以选择用 docker。
参照其安装文档 ,执行以下命令:
docker run -d \ --name hustoj \ -p 8080:80 \ -v ~/volume:/volume \ registry.gitlab.com/mgdream/hustoj
然后,问题来了。这个镜像在 registry.gitlab.com,因网络问题拉不下来,且我找不到这个的镜像源。
那没招了,换一个 OJ 吧,改用另一个开源 OJ:QingdaoU/OnlineJudge
看了一眼,它的安装只要 git clone 然后 docker compose up -d 就行了。
它的 docker-compose.yml 很干净,用的阿里源。
这个的安装就清爽多了,毕竟 GitHub 的镜像站还是挺好找的。
如果没装 docker 的话,先安装一下。
sudo curl -fsSL https://gitee.com/tech-shrimp/docker_installer/releases/download/latest/linux.sh| bash -s docker --mirror Aliyun sudo systemctl start docker
换个源:
sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": [ "https://docker.xuanyuan.me", "https://docker.1ms.run", "https://docker.m.daocloud.io" ] } EOF sudo systemctl restart docker
然后就可以下载、安装了:
git clone -b 2.0 https://gh-proxy.com/github.com/QingdaoU/OnlineJudgeDeploy.git && cd OnlineJudgeDeploy sudo docker compose up -d
这样就把 OJ 跑起来了:
这个 OJ 的 80 和 443 端口都是开放的,刚好能把 HTTP 和 HTTPS 两个都做了。
建议先用 Wireshark 抓包,看看能否正常解密 HTTPS 流量。
然后就是按实验要求,写代码抓包,提取登录信息。
由于不再是单纯的抓包,要涉及应用层协议的解析,Scapy 可能没那么好用了。于是我换了一个轮椅:Pyshark。这是一个 Tshark 的 Python wrapper,能够帮忙解析 packets。
但不知是它的问题还是我使用方式不太对,我的代码只能解析出 HTTP 的请求和响应 JSON,以及 HTTPS 的请求 JSON,解析不出 HTTPS 的响应 JSON 。
2.5k stars 的项目 ,没一个比较完整的使用文档,且我懒得翻源码,索性放弃用 Pyshark 了。
放个半成品代码:
import jsonimport pysharkfrom datetime import datetimeSSLKEYLOG_PATH = r"C:\sslkey.log" TSHARK_PATH = r"D:\Wireshark\tshark.exe" INTERFACE = "VMware Network Adapter VMnet8" DEBUG = False def get_json (packet ) -> dict : result = {} if "JSON" in packet: for field in packet["JSON" ].field_names: try : result[field] = packet["JSON" ].get_field_value(field) except : pass return json.loads(result["object" ]) if "http" in packet: http_layer = packet.http if hasattr (http_layer, "file_data" ): try : json_data = json.loads(http_layer.file_data) return json_data except : pass if "http2" in packet: http2_layer = packet.http2 if hasattr (http2_layer, "json_object" ): try : json_data = json.loads(http2_layer.json_object) return json_data except : pass if hasattr (http2_layer, "data" ): try : json_data = json.loads(http2_layer.data) return json_data except : pass def main () -> None : with open ("login_logs.csv" , "w" ) as f: f.write("时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否\n" ) capture = pyshark.LiveCapture( interface = INTERFACE, tshark_path = TSHARK_PATH, bpf_filter = "tcp port 80 or tcp port 443" , override_prefs = { "tls.keylog_file" : SSLKEYLOG_PATH, } ) if DEBUG: capture.set_debug() sessions = {} for packet in capture.sniff_continuously(): try : json_data = get_json(packet) if not json_data: continue if DEBUG: print (json.dumps(json_data, indent = 4 , ensure_ascii = False )) stream_id = packet.tcp.stream if "username" in json_data and "password" in json_data: src_mac, dst_mac = packet.eth.src, packet.eth.dst src_ip, dst_ip = packet.ip.src, packet.ip.dst time_str = datetime.fromtimestamp(float (packet.sniff_timestamp)).strftime("%Y-%m-%d %H:%M:%S" ) sessions[stream_id] = { "time" : time_str, "src_mac" : src_mac, "dst_mac" : dst_mac, "src_ip" : src_ip, "dst_ip" : dst_ip, "username" : json_data["username" ], "password" : json_data["password" ] } elif "error" in json_data and "data" in json_data and stream_id in sessions: session_info = sessions.pop(stream_id) status = "FAILED" if json_data["data" ] == "Succeeded" : status = "SUCCEED" csv_line = f"{session_info['time' ]} ,{session_info['src_mac' ]} ,{session_info['src_ip' ]} ,{session_info['dst_mac' ]} ,{session_info['dst_ip' ]} ,{session_info['username' ]} ,{session_info['password' ]} ,{status} " print (csv_line) with open ("login_logs.csv" , "a" ) as f: f.write(csv_line + "\n" ) except : pass if __name__ == "__main__" : main()
既然 Pyshark 用不了,那就直接用 Tshark 吧,翻阅文档:tshark(1) Manual Page ,确定启动参数,然后调试,输出包信息,并设计相应的解析方法。
用了才发现比 Pyshark 好写好调得多,因为 Tshark 可以直接返回 JSON。
成品代码:
import jsonimport csvimport subprocessfrom datetime import datetimeSSLKEYLOG_PATH = r"C:\sslkey.log" TSHARK_PATH = r"D:\Wireshark\tshark.exe" INTERFACE = "VMware Network Adapter VMnet8" DEBUG = True CMD = [ TSHARK_PATH, "-l" , "-n" , "-i" , INTERFACE, "-o" , f"tls.keylog_file:{SSLKEYLOG_PATH} " , "-o" , "tcp.desegment_tcp_streams:TRUE" , "-o" , "tls.desegment_ssl_records:TRUE" , "-o" , "tls.desegment_ssl_application_data:TRUE" , "-T" , "ek" , "-Y" , "http or http2" , "-e" , "frame.time_epoch" , "-e" , "eth.src" , "-e" , "eth.dst" , "-e" , "ip.src" , "-e" , "ip.dst" , "-e" , "tcp.stream" , "-e" , "http2.streamid" , "-e" , "http.file_data" , "-e" , "http2.data.data" , "-e" , "http.request.method" , "-e" , "http.request.uri" , ] def get_json (hex_str ) -> dict : try : decoded = bytes .fromhex(hex_str).decode("utf-8" ) start = decoded.find("{" ) end = decoded.rfind("}" ) + 1 return json.loads(decoded[start:end]) except : return {} def main (): csv_file = open ("login_logs.csv" , "w" , newline = "" ) csv_writer = csv.writer(csv_file) csv_writer.writerow(["时间" , "源MAC" , "源IP" , "目标MAC" , "目标IP" , "登录名" , "口令" , "成功与否" ]) process = subprocess.Popen( CMD, stdout = subprocess.PIPE, stderr = subprocess.DEVNULL, text = True , encoding = "utf-8" , errors = "replace" ) sessions = {} packet_count = 0 try : for line in process.stdout: packet_count += 1 packet = json.loads(line) if "index" in packet: continue if "layers" not in packet: continue layers = packet["layers" ] if "http2_data_data" not in layers and "http_file_data" not in layers: continue data_hex = layers["http_file_data" ][0 ] if "http_file_data" in layers else layers["http2_data_data" ][0 ] json_data = get_json(data_hex) if not json_data: continue tcp_stream = layers["tcp_stream" ][0 ] h2_stream = layers["http2_streamid" ][0 ] if "http2_streamid" in layers else None stream_id = f"{tcp_stream} _{h2_stream} " if h2_stream else tcp_stream protocol = "HTTPS" if "http2_data_data" in layers else "HTTP" method = layers["http_request_method" ][0 ] if "http_request_method" in layers else "UNKNOWN" path = layers["http_request_uri" ][0 ] if "http_request_uri" in layers else "/?" timestamp = layers["frame_time_epoch" ][0 ] time_str = datetime.fromtimestamp(float (timestamp)).strftime("%Y-%m-%d %H:%M:%S" ) if timestamp else datetime.now().strftime("%Y-%m-%d %H:%M:%S" ) src_ip = layers["ip_src" ][0 ] dst_ip = layers["ip_dst" ][0 ] src_mac = layers["eth_src" ][0 ] dst_mac = layers["eth_dst" ][0 ] if DEBUG: print ("-" * 60 ) print (f"[Packet #{packet_count} ] {time_str} " ) print (f"Protocol: {protocol} " ) if protocol == "HTTP" and method != "UNKNOWN" : print (f"Method: {method} Path: {path} " ) print (f"Stream ID: {stream_id} " ) print (f"src: {src_ip} -> dst: {dst_ip} " ) print (f"JSON: {json.dumps(json_data, indent = 4 , ensure_ascii = False )} " ) is_login_req = ("username" in json_data and "password" in json_data) if is_login_req: sessions[stream_id] = { "time" : time_str, "src_mac" : src_mac, "dst_mac" : dst_mac, "src_ip" : src_ip, "dst_ip" : dst_ip, "username" : json_data["username" ], "password" : json_data["password" ], "protocol" : protocol, "method" : method } elif stream_id in sessions: session = sessions.pop(stream_id) status = "UNKNOWN" if json_data["data" ] == "Succeeded" : status = "SUCCEED" elif json_data["data" ] == "Invalid username or password" : status = "FAILED" else : status += f" ({json_data['data' ]} )" row = [session["time" ], session["src_mac" ], session["src_ip" ], session["dst_mac" ], session["dst_ip" ], session["username" ], session["password" ], status] csv_writer.writerow(row) csv_file.flush() print (" | " .join(row)) except : pass finally : process.terminate() if __name__ == "__main__" : main()
测试输出:
------------------------------------------------------------ [Packet #2] 2026-04-08 17:07:59 Protocol: HTTP Method: POST Path: /api/tfa_required Stream ID: 3 src: 61.139.2.1 -> dst: 61.139.2.130 JSON: { "username": "admin" } ------------------------------------------------------------ [Packet #4] 2026-04-08 17:07:59 Protocol: HTTP Method: POST Path: /api/login Stream ID: 4 src: 61.139.2.1 -> dst: 61.139.2.130 JSON: { "username": "admin", "password": "admin123" } ------------------------------------------------------------ [Packet #6] 2026-04-08 17:07:59 Protocol: HTTP Stream ID: 3 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": { "result": false } } ------------------------------------------------------------ [Packet #8] 2026-04-08 17:07:59 Protocol: HTTP Stream ID: 4 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": "error", "data": "Invalid username or password" } 2026-04-08 17:07:59 | 00:50:56:c0:00:08 | 61.139.2.1 | 00:0c:29:ae:45:d1 | 61.139.2.130 | admin | admin123 | FAILED ------------------------------------------------------------ [Packet #180] 2026-04-08 17:08:03 Protocol: HTTPS Stream ID: 7_19 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": { "results": [], "total": 0 } } ------------------------------------------------------------ [Packet #182] 2026-04-08 17:08:03 Protocol: HTTPS Stream ID: 7_23 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": { "website_base_url": "http://127.0.0.1", "website_name": "Online Judge", "website_name_shortcut": "oj", "website_footer": "Online Judge Footer", "allow_register": true, "submission_list_show_all": true } } ------------------------------------------------------------ [Packet #184] 2026-04-08 17:08:03 Protocol: HTTPS Stream ID: 7_21 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": { "results": [], "total": 0 } } ------------------------------------------------------------ [Packet #204] 2026-04-08 17:08:06 Protocol: HTTPS Stream ID: 7_29 src: 61.139.2.1 -> dst: 61.139.2.130 JSON: { "username": "root" } ------------------------------------------------------------ [Packet #208] 2026-04-08 17:08:06 Protocol: HTTPS Stream ID: 7_29 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": { "result": false } } ------------------------------------------------------------ [Packet #212] 2026-04-08 17:08:07 Protocol: HTTPS Stream ID: 7_31 src: 61.139.2.1 -> dst: 61.139.2.130 JSON: { "username": "root" } ------------------------------------------------------------ [Packet #216] 2026-04-08 17:08:07 Protocol: HTTPS Stream ID: 7_33 src: 61.139.2.1 -> dst: 61.139.2.130 JSON: { "username": "root", "password": "rootroot" } ------------------------------------------------------------ [Packet #220] 2026-04-08 17:08:07 Protocol: HTTPS Stream ID: 7_31 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": { "result": false } } ------------------------------------------------------------ [Packet #222] 2026-04-08 17:08:07 Protocol: HTTPS Stream ID: 7_33 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": "Succeeded" } 2026-04-08 17:08:07 | 00:50:56:c0:00:08 | 61.139.2.1 | 00:0c:29:ae:45:d1 | 61.139.2.130 | root | rootroot | SUCCEED ------------------------------------------------------------ [Packet #226] 2026-04-08 17:08:07 Protocol: HTTPS Stream ID: 7_35 src: 61.139.2.130 -> dst: 61.139.2.1 JSON: { "error": null, "data": { "id": 1, "user": { "id": 1, "username": "root", "email": null, "admin_type": "Super Admin", "problem_permission": "All", "create_time": "2026-04-06T15:16:43.007010Z", "last_login": "2026-04-08T09:08:08.159888Z", "two_factor_auth": false, "open_api": false, "is_disabled": false }, "real_name": null, "acm_problems_status": {}, "oi_problems_status": {}, "avatar": "/public/avatar/default.png", "blog": null, "mood": null, "github": null, "school": null, "major": null, "language": null, "accepted_number": 0, "total_score": 0, "submission_number": 0 } }
有几点注意事项:
在 HTTP/2 中,数据是被切分为不同帧独立发送的:包含 :method 和 :path 的 HEADERS 帧和包含 JSON 数据的 DATA 帧是分开发送的。我没有去处理 HEADERS 帧,所以 HTTPS 协议的调试输出中不含请求方法和请求路径。 自测的时候发现,有些时候 TShark 未传递 HTTPS 响应,这属于玄学问题,可通过重启浏览器或新开一个无痕窗口解决。