计网实验 - PCAP 侦听与分析

计网实验 - PCAP 侦听与分析

Coast23

实验要求

工具准备

流量分析工具

  • Wireshark:流量分析的瑞士军刀,功能强大但界面略显简陋。
  • EasyTshark:网上大佬对 Tshark 的二次开发,功能精简(很有限),界面美观(Web 做的能不美观吗),上手容易,但 BUG 有亿点多。适合当一个玩具。
  • Omnipeek:没用过,似乎要付费,官网连个下载按钮都找不到。

有 Wireshark 就够了。不过我后面的演示有用到 EasyTshark,所以提一嘴。

组件

Packet capture (and sending) library for Windows.

  • WinPcap:它的最后一次更新是在 2013.03.08… 应该被淘汰了。
  • Npcap:比 WinPcap 强大。若安装 Wireshark,会附带安装 Npcap。

二选一安装即可。

其它

可选,一些可能有用的辅助工具。

  • Python-Scapy:数据包处理一把梭。可使用 pip install scapy 安装。
  • Filezilla:FTP Client & Server,需要自取。

实验过程

1. 抓包

先侦听流量并保存为 .pcapng 文件,后面再分析。

启动抓包软件,选择 Interface,软件会记录所有经过该网卡的数据包。然后停止并保存即可。

由于实验要求分析 FTP 和 HTTP 流量,所以需要人为地发起这 2 种协议的请求。

由于我的 2 个抓包软件都抓不到本地回环的流量,所以直接在 localhost 起服务器可能不太可行,我在实操中用了一些曲线救国的方法。

FTP

懒得搭建和配置 FTP 服务器了,借用一下学校交作业用的 FTP 服务器。反正只是抓一下登录的流量,没做什么破坏性的事。

我习惯用 Filezilla Client 来连接 FTP Server,直接用 Windows 的资源管理器应该也可以。

详见后文

HTTP

可以在网上找找有没有 HTTP 服务器。我懒得找,所以自己搭了一个。

Python 自带 HTTP 服务器,终端使用命令 python -m http.server [port] 即可在指定端口开启 HTTP 服务,默认端口是 8000。

前面说了,我的抓包软件抓不到本地回环的流量,所以我是在虚拟机里起的服务器,然后监听虚拟网卡,捕获向虚拟机的 HTTP 服务器发起的请求。

SMTP

虚拟机起一个 SMTP 服务器。

Python 在 3.11 版本及之前,内置了一个极简的 SMTP 调试服务器:smtpd,可通过命令 python -m smtpd -c DebuggingServer -n localhost:11451 启动。

对于 3.12 及更新的版本,smtpd 已被移除,可以用三方库 aiosmtpd 做替代。用包管理器安装后,使用命令 python -m aiosmtpd -n -l 0.0.0.0:11451 启动。

回到主机,利用内置的 smtplib 库发送一封简单的邮件:

import smtplib
from email.message import EmailMessage

msg = EmailMessage()
msg["From"] = "qwq@smtp-test.com"
msg["To"] = "awa@smtp-test.com"
msg["Subject"] = "Hello world"
msg.set_content("This is the letter content. I'm evil, nothing here.")

# 61.139.2.133:11451 改为你的 SMTP 服务器地址和端口
with smtplib.SMTP("61.139.2.133", 11451) as server:
server.send_message(msg)

若邮件成功发送,服务端终端会有回显:

smtp_server

抓包记录这个过程,会话数据流如下:

smtp_pcap

HTTPS

HTTPS = HTTP + SSL / TLS,解密要用到 SSLKEY,只要设置环境变量 SSLKEYLOGFILE重启 Chrome 后, Chrome 就会把 SSLKEY 保存到指定的文件。

不抓包的时候,记得删掉这个环境变量,避免资源浪费。

以 Windows 为例:

val

然后给 Wireshark 导入证书(编辑 -> 首选项 -> Protocols -> TLS -> (Pre)-Master-Secret log filename 选择你的 sslkeylog 文件),然后就能够解密 HTTPS 流量了:

wireshark_https

2. 数据包分析

观察数据格式

  • 帧格式 / IP 报文

在 Wireshark 里打开刚刚捕获的 .pcapng 文件,选择一个 TCP 包,查看它的帧信息。

各个字段的含义:

协议/封装单元OSI 层级说明
Frame物理层物理层的数据帧概况
Ethernet II数据链路层数据链路层以太网帧头部信息
Internet Protocol Version 4/6网络层互联网层 IP 包头部信息,属于网络层
Transmission Control Protocol传输层传输层的数据段头部信息,此处是 TCP
Hypertext Transfer Protocol应用层应用层的信息,如 HTTP 请求/响应

以太网的帧格式:

frame_format

前同步码、帧首定界符、CRC 校验位似乎并未在数据包里体现,从抓到的数据包头开始分别是 6 字节的目的地址,6 字节的源地址,2 字节的类型字段,以及 46-1500 字节的数据。

以下是一个示例:

目的地址是 40:fe:95:fe:80:01

dst_addr

源地址是 04:ec:d8:c9:52:ac

src_addr

类型是 0x86dd(IPv6 数据包):

type_field

Type 之后就是 IPv4 或 IPv6 的报文头部信息(位于帧的数据区段),课上没讲,这里亦不做展开。

对于实验要求验证的内容,帧格式 & MAC地址 见上图的 Ethernet IIIP 报文格式 见上图的 Internet Protocol Version 6TCP 段格式 见上图的 Transmission Control Protocol,这里就不把具体内容再贴一遍了。

FTP 协议命令与响应格式

FTP 协议命令格式<命令码> [<参数>] <CR><LF>

常见命令如下:

命令格式说明
USERUSER <用户名><CR><LF>发送用户名
PASSPASS <密码><CR><LF>发送密码
AUTHAUTH <机制><CR><LF>认证机制,如 AUTH TLSAUTH SSL,用于加密控制连接
SYSTSYST<CR><LF>获取服务器操作系统类型
FEATFEAT<CR><LF>查询服务器支持的特性扩展列表
PWDPWD<CR><LF>打印当前工作目录(Print Working Directory)
TYPETYPE <类型码><CR><LF>设置传输类型,如 TYPE A(ASCII)、TYPE I(二进制/图像)
PASVPASV<CR><LF>进入被动模式,服务器返回IP和端口供客户端连接
MLSDMLSD [<路径>]<CR><LF>列出目录内容(机器可解析格式),比LIST更标准化
CWDCWD <路径><CR><LF>改变当前工作目录(Change Working Directory)
RETRRETR <文件名><CR><LF>下载文件
STORSTOR <文件名><CR><LF>上传文件
PORTPORT h1,h2,h3,h4,p1,p2<CR><LF>指定数据连接端口(主动模式)
LISTLIST [<路径>]<CR><LF>列出目录内容(人类可读格式)
QUITQUIT<CR><LF>断开连接

FTP 协议响应格式<状态码> <空格> <说明文本> <CR><LF>

具体见Wikipedia,或者对照如下 AI 给的表格(正确性未检验):

状态码表
状态码含义出现场景
110重新启动标记应答REST 命令后
120服务在 nnn 分钟内准备好服务器繁忙时
125数据连接已打开,准备传送数据传输开始前
150文件状态良好,打开数据连接LIST、RETR、STOR 命令后
200命令成功TYPE、PORT、PASV 等命令成功
211系统状态或帮助响应SYST、STAT 命令的响应
212目录状态STAT 命令
213文件状态STAT 命令
214帮助信息HELP 命令
215名字系统类型SYST 命令响应
220服务就绪连接建立后
221服务关闭,退出登录QUIT 命令后
225数据连接打开,无传输数据连接已建立
226关闭数据连接,传输成功文件传输完成
227进入被动模式PASV 命令响应,返回 IP 和端口
229进入扩展被动模式EPSV 命令响应
230用户登录成功PASS 命令验证通过
234AUTH 命令成功AUTH TLS/SSL 成功
250请求的文件操作完成CWD、RMD、MKD 等命令成功
257路径名已创建PWD、MKD 命令响应
331用户名正确,需要密码USER 命令后
332需要登录账户信息需要 ACCT 命令
350需要进一步命令RNFR 命令后需要 RNTO
421无法提供服务,关闭连接连接超时或服务器过载
425无法打开数据连接端口无法绑定或连接失败
426连接关闭,传输中止数据传输中断
450请求的文件操作未执行文件被占用或不可访问
451操作中止:本地错误服务器内部错误
452系统存储空间不足写入磁盘空间不足
500语法错误,命令不可识别未知命令
501参数语法错误参数格式错误
502命令未实现服务器不支持该命令
503命令顺序错误如登录前执行需要登录的命令
504命令参数未实现参数有效但不支持
530未登录需要先登录认证
532需要账户信息存储文件需要 ACCT 命令
550请求操作未执行文件不存在或无权限
551操作中止:页类型未知非标准页类型
552存储分配溢出磁盘空间或配额不足
553文件名不合法文件名包含非法字符

用过滤器过滤出 FTP 数据包:

ftp_list

过滤器使用示例
  • ftp:筛选所有 FTP 协议的数据包
  • ftp.request.command == "AUTH":筛选 FTP 请求中命令为 AUTH 的数据包
  • ftp.response.code == 220:筛选 FTP 特定相应状态码的数据包
  • 可参考过滤器的补全提示

一个一个包点开看显得很蠢,比较好的做法是分析会话数据流

选中一个数据包,如果是 Wireshark,右键 -> 追踪流 -> TCP Stream 或者按快捷键 Ctrl + Shift + Alt + T,如果是 EasyTshark,点击数据包右侧聊天气泡般的按钮即可,这里以 EasyTshark 为例。

会话时序图:

dialog

会话数据流里可看到 FTP 命令、响应、数据传输等信息:

stream1

stream2

可见,命令和相应格式与前面所说一致。

HTTP 流量分析

也是个明文协议,看看会话数据流就差不多了,没啥好分析的。

stream3

TCP 机制观察

连接建立与拆除

TCP 有三次握手与四次挥手。

  • 三次握手:客户端 SYN,服务端 SYN + ACK,客户端 ACK
  • 四次挥手:主动关闭方 FIN + ACK,被动关闭方 ACK,被动关闭方 FIN + ACK,主动关闭方 ACK

挑一个 TCP 包,查看时序图,即可观察到 TCP 的三次握手与四次挥手的过程:

tcp

序列号与确认号的变化

seq

窗口机制与拥塞控制

这部分参考了以下两篇文章:

具体实验做法很简单,找个站点下载文件,并用 Wireshark 捕捉整个过程,分析 tcptrace 图(统计 -> TCP 流图形 -> tcptrace)。

如果得不到理想的图,可能要换几个站点,多次尝试。

我得到的 tcptrace 图如下:

tcptrace

这个图一共有三条线,分别代表:

  • 绿线:接收方通告的窗口大小,即接收方当前能接收多少数据(窗口机制的体现)。
  • 蓝线:发送方实际发出的数据量。它是连续攀升的,每向上一步,都代表一个数据包被发出。线的斜率直接反映了当时的发送速率。
  • 黄线:发送方已 ACK 的数据量,也就是实际收到的数据。
  • 红色竖线:代表 SACK(Selective Acknowledgment),每条红色竖线代表一个被接收方成功收到但“不连续”的数据块,意味着出现了丢包数据包乱序

从这张图就可以清晰地看到 TCP 的慢启动和拥塞避免阶段:

tcptrace2

3. 调库侦听网络数据并记录统计

Ether/IP

实验要求

侦听网络上的数据流,解析发送方与接收方的 MAC 和 IP 地址,并按如下 CSV 格式输出日志:

时间,源MAC,源IP,目标MAC,目标IP,帧长度
2015-03-14 13:05:16,60-36-DD-7D-D5-21,192.168.33.1,60-36-DD-7D-D5-72,192.168.33.2,1536

使用 Python + Scapy 一把梭:

import csv
import time
from collections import defaultdict
from scapy.all import Ether, IP, sniff

stats = defaultdict(int)
start_time = time.time()

def process_packet(packet):
global start_time

# 确保是以太网帧且是 IP 包
if Ether in packet and IP in packet:
pkt_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(packet.time))
src_mac = packet[Ether].src
dst_mac = packet[Ether].dst
src_ip = packet[IP].src
dst_ip = packet[IP].dst
pkt_len = len(packet)

# 写入 CSV
row = [pkt_time, src_mac, src_ip, dst_mac, dst_ip, pkt_len]
print(",".join(map(str, row)))
with open("traffic_log.csv", "a", newline = "") as f:
writer = csv.writer(f)
writer.writerow(row)

# 长度记录
stats[f"From {src_mac}({src_ip})"] += pkt_len
stats[f"To {dst_mac}({dst_ip})"] += pkt_len

# 每 15 秒输出一次统计
current_time = time.time()
if current_time - start_time >= 15:
for key, val in stats.items():
print(f"{key}: {val} Bytes")
print("-------------------------\n")
stats.clear()
start_time = current_time

with open("traffic_log.csv", "w", newline = "") as f:
f.write("时间,源MAC,源IP,目标MAC,目标IP,帧长度\n")

sniff(prn = process_packet, store = False)

FTP

实验要求

侦听 FTP 流量,提取用户名和口令并记录登录行为。日志格式示例如下:

时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否
... student,software,SUCCEED
... student,software1,FAILED

和前面一样。

from scapy.all import Ether, IP, TCP, Raw, sniff, rdpcap
import time
import csv
from collections import defaultdict

# key = (client_ip, server_ip),value = {"user":..., "pass":..., "src_mac":..., ...}
ftp_sessions = {}

def log_session(session, result):
user = session.get("user", "UNKNOWN")
passwd = session.get("pass", "UNKNOWN")
row = [
session.get("time", ""),
session.get("src_mac", ""),
session.get("src_ip", ""),
session.get("dst_mac", ""),
session.get("dst_ip", ""),
user,
passwd,
result
]
print(",".join(map(str, row)))
with open("ftp_session.csv", "a", newline = "") as f:
writer = csv.writer(f)
writer.writerow(row)

def process_packet(packet):
global start_time

if Ether in packet and IP in packet and TCP in packet and Raw in packet:
pkt_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(packet.time)))
src_mac = packet[Ether].src
dst_mac = packet[Ether].dst
src_ip = packet[IP].src
dst_ip = packet[IP].dst
pkt_len = len(packet)

sport = packet[TCP].sport
dport = packet[TCP].dport

# 正常来说,得做一个协议分析来判断是否为 FTP 流量,这里我选择简化处理,默认 21 号端口的 TCP 数据包都是 FTP 流量。
if dport != 21 and sport != 21: return

try:
payload = packet[Raw].load.decode("utf-8", errors = "ignore").strip()
except Exception: return

# Client -> Server
if dport == 21:
session_key = (src_ip, dst_ip)
if payload.upper().startswith("USER "):
username = payload[5:].strip()
ftp_sessions[session_key] = {
"user": username,
"pass": "",
"src_mac": src_mac,
"dst_mac": dst_mac,
"src_ip": src_ip,
"dst_ip": dst_ip,
"time": pkt_time
}
print(f" USER: {username} ({src_ip} -> {dst_ip})")

elif payload.upper().startswith("PASS "):
password = payload[5:].strip()
if session_key in ftp_sessions:
ftp_sessions[session_key]["pass"] = password
ftp_sessions[session_key]["time"] = pkt_time
print(f" PASS: {password} ({src_ip} -> {dst_ip})")

# Server -> Client
elif sport == 21:
session_key = (dst_ip, src_ip)

if session_key not in ftp_sessions: return

session = ftp_sessions[session_key]

# 230 Login successful
if payload.startswith("230"):
log_session(session, "SUCCEED")
del ftp_sessions[session_key]

# 530 Login incorrect / not logged in
elif payload.startswith("530"):
log_session(session, "FAILED")
del ftp_sessions[session_key]

# 331 User name okay, need password.
elif payload.startswith("331"):
print(f" User name okay, need password.")


with open("ftp_session.csv", "w") as f:
f.write("时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否\n")

sniff(prn = process_packet, filter = "tcp port 21", store = False)

测试输出:

USER: hello  (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: world (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:52:40,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,hello,world,FAILED
USER: teacher (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: world (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:53:13,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,teacher,world,FAILED
USER: root (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: root (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:53:24,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,root,root,FAILED
USER: root (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: 123456 (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:53:29,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,root,123456,FAILED
USER: admin (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: admin123 (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:53:43,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,admin,admin123,FAILED
USER: admin (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: admin (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:53:50,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,admin,admin,FAILED
USER: student (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: 123456 (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:54:04,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,student,123456,FAILED
USER: student (10.30.38.117 -> 121.192.180.236)
User name okay, need password.
PASS: Iwonttellu (10.30.38.117 -> 121.192.180.236)
2026-04-03 14:54:17,04:ec:d8:c9:52:a7,10.30.38.117,40:fe:95:fe:80:01,121.192.180.236,student,Iwonttellu,SUCCEED

HTTP & HTTPS

实验要求

侦听并观察 HTTP 与 HTTPS(在可控情况下、指定证书或单向加密场景)数据,分析访问特征。可在本地安装 LNMP(如 HUSTOJ)以生成测试流量,解析协议内容并记录与统计用户访问行为。程序在日志中应记录下列格式(CSV)示例:

时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否
... student,software,SUCCEED
... student,software1,FAILED

实验建议装一个 HUSTOJ 来测试,那就尝试部署看看。

(其实 vibe 一个 HTTP/HTTPS Server 也不困难,这里尊重一下实验要求。)

若直接使用 install 脚本,会给我的虚拟机装一堆依赖。由于我有点洁癖,所以选择用 docker。

参照其安装文档,执行以下命令:

docker run -d           \
--name hustoj \
-p 8080:80 \
-v ~/volume:/volume \
registry.gitlab.com/mgdream/hustoj

然后,问题来了。这个镜像在 registry.gitlab.com,因网络问题拉不下来,且我找不到这个的镜像源。

那没招了,换一个 OJ 吧,改用另一个开源 OJ:QingdaoU/OnlineJudge

看了一眼,它的安装只要 git clone 然后 docker compose up -d 就行了。

它的 docker-compose.yml 很干净,用的阿里源。

这个的安装就清爽多了,毕竟 GitHub 的镜像站还是挺好找的。

如果没装 docker 的话,先安装一下。

sudo curl -fsSL https://gitee.com/tech-shrimp/docker_installer/releases/download/latest/linux.sh| bash -s docker --mirror Aliyun
sudo systemctl start docker

换个源:

sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": [
"https://docker.xuanyuan.me",
"https://docker.1ms.run",
"https://docker.m.daocloud.io"
]
}
EOF

sudo systemctl restart docker

然后就可以下载、安装了:

git clone -b 2.0 https://gh-proxy.com/github.com/QingdaoU/OnlineJudgeDeploy.git && cd OnlineJudgeDeploy

sudo docker compose up -d

这样就把 OJ 跑起来了:

OJ

这个 OJ 的 80 和 443 端口都是开放的,刚好能把 HTTP 和 HTTPS 两个都做了。

建议先用 Wireshark 抓包,看看能否正常解密 HTTPS 流量。

然后就是按实验要求,写代码抓包,提取登录信息。

由于不再是单纯的抓包,要涉及应用层协议的解析,Scapy 可能没那么好用了。于是我换了一个轮椅:Pyshark。这是一个 Tshark 的 Python wrapper,能够帮忙解析 packets。

但不知是它的问题还是我使用方式不太对,我的代码只能解析出 HTTP 的请求和响应 JSON,以及 HTTPS 的请求 JSON,解析不出 HTTPS 的响应 JSON

2.5k stars 的项目,没一个比较完整的使用文档,且我懒得翻源码,索性放弃用 Pyshark 了。

放个半成品代码:

import json
import pyshark
from datetime import datetime

SSLKEYLOG_PATH = r"C:\sslkey.log" # 注意修改为你的 SSLKEYLOGFILE 路径
TSHARK_PATH = r"D:\Wireshark\tshark.exe" # 注意修改为你的 tshark 路径
INTERFACE = "VMware Network Adapter VMnet8" # 注意修改为你的网卡名
DEBUG = False # 调试用

def get_json(packet) -> dict:
result = {}

# 明文 HTTP JSON
if "JSON" in packet:
for field in packet["JSON"].field_names:
try:
result[field] = packet["JSON"].get_field_value(field)
except: pass
return json.loads(result["object"])

# HTTP/1.1 (没用到)
if "http" in packet:
http_layer = packet.http
if hasattr(http_layer, "file_data"):
try:
json_data = json.loads(http_layer.file_data)
return json_data
except: pass

# HTTP/2
if "http2" in packet:
http2_layer = packet.http2

if hasattr(http2_layer, "json_object"):
try:
json_data = json.loads(http2_layer.json_object)
return json_data
except: pass

if hasattr(http2_layer, "data"):
try:
json_data = json.loads(http2_layer.data)
return json_data
except: pass

def main() -> None:

with open("login_logs.csv", "w") as f:
f.write("时间,源MAC,源IP,目标MAC,目标IP,登录名,口令,成功与否\n")

capture = pyshark.LiveCapture(
interface = INTERFACE,
tshark_path = TSHARK_PATH,
bpf_filter = "tcp port 80 or tcp port 443",
override_prefs = {
"tls.keylog_file": SSLKEYLOG_PATH,
}
)

if DEBUG: capture.set_debug()

sessions = {}

for packet in capture.sniff_continuously():
try:
# if DEBUG:
# if "http" in packet:
# print(f"[HTTP] {packet.ip.src} -> {packet.ip.dst} | {packet.http.request_method} {packet.http.request_uri if hasattr(packet.http, 'request_uri') else ''}")
# elif "http2" in packet and ("POST" in packet.http2.stream) or "GET" in packet.http2.stream:
# print(f"[HTTP/2] {packet.ip.src} -> {packet.ip.dst} | {packet.http2.stream}")

json_data = get_json(packet)
if not json_data: continue

if DEBUG: print(json.dumps(json_data, indent = 4, ensure_ascii = False))

stream_id = packet.tcp.stream

if "username" in json_data and "password" in json_data:
src_mac, dst_mac = packet.eth.src, packet.eth.dst
src_ip, dst_ip = packet.ip.src, packet.ip.dst
time_str = datetime.fromtimestamp(float(packet.sniff_timestamp)).strftime("%Y-%m-%d %H:%M:%S")

sessions[stream_id] = {
"time": time_str,
"src_mac": src_mac,
"dst_mac": dst_mac,
"src_ip": src_ip,
"dst_ip": dst_ip,
"username": json_data["username"],
"password": json_data["password"]
}

elif "error" in json_data and "data" in json_data and stream_id in sessions:
session_info = sessions.pop(stream_id)
status = "FAILED"
if json_data["data"] == "Succeeded":
status = "SUCCEED"
csv_line = f"{session_info['time']},{session_info['src_mac']},{session_info['src_ip']},{session_info['dst_mac']},{session_info['dst_ip']},{session_info['username']},{session_info['password']},{status}"
print(csv_line)
with open("login_logs.csv", "a") as f:
f.write(csv_line + "\n")

except: pass

if __name__ == "__main__":
main()

既然 Pyshark 用不了,那就直接用 Tshark 吧,翻阅文档:tshark(1) Manual Page,确定启动参数,然后调试,输出包信息,并设计相应的解析方法。

用了才发现比 Pyshark 好写好调得多,因为 Tshark 可以直接返回 JSON。

成品代码:

import json
import csv
import subprocess
from datetime import datetime

SSLKEYLOG_PATH = r"C:\sslkey.log" # 注意修改为你的 SSLKEYLOGFILE 路径
TSHARK_PATH = r"D:\Wireshark\tshark.exe" # 注意修改为你的 tshark 路径
INTERFACE = "VMware Network Adapter VMnet8" # 注意修改为你的网卡名
DEBUG = True # 调试用

CMD = [
TSHARK_PATH,
"-l", "-n",
"-i", INTERFACE,
"-o", f"tls.keylog_file:{SSLKEYLOG_PATH}",
"-o", "tcp.desegment_tcp_streams:TRUE",
"-o", "tls.desegment_ssl_records:TRUE",
"-o", "tls.desegment_ssl_application_data:TRUE",
"-T", "ek",
"-Y", "http or http2",
"-e", "frame.time_epoch",
"-e", "eth.src",
"-e", "eth.dst",
"-e", "ip.src",
"-e", "ip.dst",
"-e", "tcp.stream",
"-e", "http2.streamid",
"-e", "http.file_data",
"-e", "http2.data.data",
"-e", "http.request.method",
"-e", "http.request.uri",
]

def get_json(hex_str) -> dict:
try:
decoded = bytes.fromhex(hex_str).decode("utf-8")
start = decoded.find("{")
end = decoded.rfind("}") + 1
return json.loads(decoded[start:end])
except: return {}

def main():
csv_file = open("login_logs.csv", "w", newline = "")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["时间", "源MAC", "源IP", "目标MAC", "目标IP", "登录名", "口令", "成功与否"])

process = subprocess.Popen(
CMD,
stdout = subprocess.PIPE,
stderr = subprocess.DEVNULL,
text = True,
encoding = "utf-8",
errors = "replace"
)

sessions = {}
packet_count = 0

try:
for line in process.stdout:
packet_count += 1
packet = json.loads(line)
if "index" in packet: continue
if "layers" not in packet: continue
layers = packet["layers"]
if "http2_data_data" not in layers and "http_file_data" not in layers: continue

# if DEBUG: print(layers)

data_hex = layers["http_file_data"][0] if "http_file_data" in layers else layers["http2_data_data"][0]
json_data = get_json(data_hex)
if not json_data: continue
# print(json_data)

tcp_stream = layers["tcp_stream"][0]
h2_stream = layers["http2_streamid"][0] if "http2_streamid" in layers else None
stream_id = f"{tcp_stream}_{h2_stream}" if h2_stream else tcp_stream

protocol = "HTTPS" if "http2_data_data" in layers else "HTTP"
method = layers["http_request_method"][0] if "http_request_method" in layers else "UNKNOWN"
path = layers["http_request_uri"][0] if "http_request_uri" in layers else "/?"
timestamp = layers["frame_time_epoch"][0]
time_str = datetime.fromtimestamp(float(timestamp)).strftime("%Y-%m-%d %H:%M:%S") if timestamp else datetime.now().strftime("%Y-%m-%d %H:%M:%S")

src_ip = layers["ip_src"][0]
dst_ip = layers["ip_dst"][0]
src_mac = layers["eth_src"][0]
dst_mac = layers["eth_dst"][0]

if DEBUG:
print("-" * 60)
print(f"[Packet #{packet_count}] {time_str}")
print(f"Protocol: {protocol}")
if protocol == "HTTP" and method != "UNKNOWN": print(f"Method: {method} Path: {path}")
print(f"Stream ID: {stream_id}")
print(f"src: {src_ip} -> dst: {dst_ip}")
print(f"JSON: {json.dumps(json_data, indent = 4, ensure_ascii = False)}")

is_login_req = ("username" in json_data and "password" in json_data)

if is_login_req:
sessions[stream_id] = {
"time": time_str,
"src_mac": src_mac,
"dst_mac": dst_mac,
"src_ip": src_ip,
"dst_ip": dst_ip,
"username": json_data["username"],
"password": json_data["password"],
"protocol": protocol,
"method": method
}
elif stream_id in sessions:
session = sessions.pop(stream_id)
status = "UNKNOWN"
if json_data["data"] == "Succeeded":
status = "SUCCEED"
elif json_data["data"] == "Invalid username or password":
status = "FAILED"
else: status += f" ({json_data['data']})"

row = [session["time"], session["src_mac"], session["src_ip"],
session["dst_mac"], session["dst_ip"], session["username"],
session["password"], status]
csv_writer.writerow(row)
csv_file.flush()

print(" | ".join(row))
except: pass
finally: process.terminate()

if __name__ == "__main__":
main()

测试输出:

------------------------------------------------------------
[Packet #2] 2026-04-08 17:07:59
Protocol: HTTP
Method: POST Path: /api/tfa_required
Stream ID: 3
src: 61.139.2.1 -> dst: 61.139.2.130
JSON: {
"username": "admin"
}
------------------------------------------------------------
[Packet #4] 2026-04-08 17:07:59
Protocol: HTTP
Method: POST Path: /api/login
Stream ID: 4
src: 61.139.2.1 -> dst: 61.139.2.130
JSON: {
"username": "admin",
"password": "admin123"
}
------------------------------------------------------------
[Packet #6] 2026-04-08 17:07:59
Protocol: HTTP
Stream ID: 3
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": {
"result": false
}
}
------------------------------------------------------------
[Packet #8] 2026-04-08 17:07:59
Protocol: HTTP
Stream ID: 4
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": "error",
"data": "Invalid username or password"
}
2026-04-08 17:07:59 | 00:50:56:c0:00:08 | 61.139.2.1 | 00:0c:29:ae:45:d1 | 61.139.2.130 | admin | admin123 | FAILED
------------------------------------------------------------
[Packet #180] 2026-04-08 17:08:03
Protocol: HTTPS
Stream ID: 7_19
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": {
"results": [],
"total": 0
}
}
------------------------------------------------------------
[Packet #182] 2026-04-08 17:08:03
Protocol: HTTPS
Stream ID: 7_23
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": {
"website_base_url": "http://127.0.0.1",
"website_name": "Online Judge",
"website_name_shortcut": "oj",
"website_footer": "Online Judge Footer",
"allow_register": true,
"submission_list_show_all": true
}
}
------------------------------------------------------------
[Packet #184] 2026-04-08 17:08:03
Protocol: HTTPS
Stream ID: 7_21
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": {
"results": [],
"total": 0
}
}
------------------------------------------------------------
[Packet #204] 2026-04-08 17:08:06
Protocol: HTTPS
Stream ID: 7_29
src: 61.139.2.1 -> dst: 61.139.2.130
JSON: {
"username": "root"
}
------------------------------------------------------------
[Packet #208] 2026-04-08 17:08:06
Protocol: HTTPS
Stream ID: 7_29
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": {
"result": false
}
}
------------------------------------------------------------
[Packet #212] 2026-04-08 17:08:07
Protocol: HTTPS
Stream ID: 7_31
src: 61.139.2.1 -> dst: 61.139.2.130
JSON: {
"username": "root"
}
------------------------------------------------------------
[Packet #216] 2026-04-08 17:08:07
Protocol: HTTPS
Stream ID: 7_33
src: 61.139.2.1 -> dst: 61.139.2.130
JSON: {
"username": "root",
"password": "rootroot"
}
------------------------------------------------------------
[Packet #220] 2026-04-08 17:08:07
Protocol: HTTPS
Stream ID: 7_31
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": {
"result": false
}
}
------------------------------------------------------------
[Packet #222] 2026-04-08 17:08:07
Protocol: HTTPS
Stream ID: 7_33
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": "Succeeded"
}
2026-04-08 17:08:07 | 00:50:56:c0:00:08 | 61.139.2.1 | 00:0c:29:ae:45:d1 | 61.139.2.130 | root | rootroot | SUCCEED
------------------------------------------------------------
[Packet #226] 2026-04-08 17:08:07
Protocol: HTTPS
Stream ID: 7_35
src: 61.139.2.130 -> dst: 61.139.2.1
JSON: {
"error": null,
"data": {
"id": 1,
"user": {
"id": 1,
"username": "root",
"email": null,
"admin_type": "Super Admin",
"problem_permission": "All",
"create_time": "2026-04-06T15:16:43.007010Z",
"last_login": "2026-04-08T09:08:08.159888Z",
"two_factor_auth": false,
"open_api": false,
"is_disabled": false
},
"real_name": null,
"acm_problems_status": {},
"oi_problems_status": {},
"avatar": "/public/avatar/default.png",
"blog": null,
"mood": null,
"github": null,
"school": null,
"major": null,
"language": null,
"accepted_number": 0,
"total_score": 0,
"submission_number": 0
}
}

有几点注意事项:

  1. 在 HTTP/2 中,数据是被切分为不同帧独立发送的:包含 :method:path 的 HEADERS 帧和包含 JSON 数据的 DATA 帧是分开发送的。我没有去处理 HEADERS 帧,所以 HTTPS 协议的调试输出中不含请求方法和请求路径。
  2. 自测的时候发现,有些时候 TShark 未传递 HTTPS 响应,这属于玄学问题,可通过重启浏览器或新开一个无痕窗口解决。
  • 标题: 计网实验 - PCAP 侦听与分析
  • 作者: Coast23
  • 创建于 : 2026-04-03 15:08:58
  • 更新于 : 2026-04-08 17:24:14
  • 链接: https://coast23.github.io/2026/04/03/计网实验-PCAP-侦听与分析/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论