Featured image of post HTTP 1.* Pcap 生成方法

HTTP 1.* Pcap 生成方法

使用 Python 脚本生成 HTTP1.* Pcap

一、概述

本文将介绍一种使用 Python 脚本构造 HTTP1.* Pcap 包的方法,支持自定义 HTTP 所有参数。本文假设读者具有一定的网络协议基础,对 OSI 七层模型或 TCP/IP 模型有一定的了解。

二、为什么要自己构造 Pcap

工作中经常需要用到各种 Pcap 包用于测试安全产品,比如测试各种安全规则、构造各种测试资产、测试产品的全流程数据处理是否正常等等。但是传统的 Pcap 包的获取方法较为耗时,如自己搭建各种靶场然后抓包,且抓包获取的数据也不一定 100% 贴合需求,定制五元组信息也不是很方便。

但是如果我们能自己构造 Pcap 包,那我们就能自定义数据包中的内容,相较于抓包将更加灵活高效,测试时我们也不再需要到处找 Pcap 包。

同时如果我们封装一套工具,每次都能通过配置一键生成我们所需 Pcap 那将更加方便,相当于我们根据需求准备了一套 Pcap 库。

三、构造 Pcap 包的工具 Scapy

Scapy 是一个由 Python 编写的强大交互式数据包处理程序。它能够伪造、发送、捕获、分析和操作网络数据包。它最大的特点是赋予了用户极低的层级和极高的自由度来定制和构建网络数据包,因此被广泛用于网络测试、扫描、攻击、探测以及教学等领域。

我们知道 Pcap 中的数据其实就是数据链路层上的一个个帧,而这里的帧又是从 OSI 七层模型(或TCP/IP模型) 一层层的从上往下封包形成,要自己构造 Pcap 就得手动模拟这一层层的封包行为来模拟一层层的协议,所以构造 Pcap 包无非就是模拟还原相关协议。

Scapy 有趣的是通过 重载 / 运算符,把不同层次的协议“叠加”起来,清晰地表达了数据包的层级结构,大大简化了包构造的复杂度。

例如,一个 TCP SYN 包可以这样写:

1
2
3
from scapy.all import *

packet = Ether() / IP(dst="192.168.1.1") / TCP(dport=80, flags="S")

更多相关的用法可以参考 Scapy 官方文档。Pcap 的底层数据构造分析见往期博客《Libpcap格式 pcap包分析》。

四、HTTP 1.* 与 TCP 的关系

HTTP 协议为应用层协议,属于 OSI 七层模型的第七层或 TCP/IP 模型的第五层。TCP 为传输层协议,属于 OSI 七层模型的第四层。我们知道 HTTP 1.* 是基于 TCP 协议的,那么下面使用一个时序图绘制一次 HTTP 请求响应对应的 TCP 层活动(注意 seq/ack 号的变化):

sequenceDiagram participant Client as 客户端 participant Server as 服务器 Client->>Server: SYN (seq=x) Server->>Client: SYN-ACK (seq=y, ack=x+1) Client->>Server: ACK (seq=x+1, ack=y+1) Note right of Client: TCP 连接已建立!开始传输 HTTP 数据 Client->>Server: HTTP 请求 [PUSH, ACK] (seq=x+1, ack=y+1) Note right of Client: 请求大小假设为 150 字节 Server->>Client: 对请求的确认 [ACK] (seq=y+1, ack=x+1+150) Server->>Client: HTTP 响应 [PUSH, ACK] (seq=y+1, ack=x+1+150) Note left of Server: 响应大小假设为 500 字节 Client->>Server: 对响应的确认 [ACK] (seq=x+1+150, ack=y+1+500) Client->>Server: FIN-ACK (seq=x+1+150, ack=y+1+500) Server->>Client: ACK (seq=y+1+500, ack=x+1+150+1) Server->>Client: FIN-ACK (seq=y+1+500, ack=x+1+150+1) Client->>Server: ACK (seq=x+1+150+1, ack=y+1+500+1) Note right of Client: TCP 连接已关闭!

整体过程可以总结为:

  1. 三次握手建立 TCP 连接;
  2. 连接建立后开始传输 HTTP 数据,受限于 MTU 和 MSS,如果 HTTP 传输的数据比较多可能需要通过多个 TCP 数据包进行传输,即分包;
  3. 在 HTTP 数据传输完后,需要进行四次挥手关闭 TCP 连接;

要使用 Scapy 生成 HTTP Pcap 包,其实我们要模拟就是上述过程。

有几点值得注意:

  1. 纯 ACK 和 FIN 不占用序号,即 ACK 数据包和 FIN-ACK 数据包不占用序号;
  2. 推送数据使用 PUSH-ACK 数据包,对端在收到一个或多个此类的数据包后通常会回一个 ACK 包进行确认;

五、使用 Scapy 构造 HTTP1.* 的 Pcap 包

本文主要讲解构造 HTTP1.* 的 Pcap 包,前面章节提到使用 Scapy 我们可以手动构造网络中每一层的协议,要构造 HTTP 这一应用层协议,我们就要手动处理应用层及往下的每一层协议:应用层(HTTP) / 传输层(TCP) / 网络层(IP) / 数据链路层(以太网)

  • HTTP (应用层):自定义 HTTP 协议相关内容,如请求头、请求体、响应头、响应体。
  • TCP (传输层):自定义 TCP 协议相关内容,如源端口和目的端口。
  • IP (网络层):自定义 IP 协议相关内容,如源 IP 和目的 IP。
  • Ether (数据链路层):自定义以太网协议相关内容,如源 MAC 和目的 MAC。

整体用 Scapy 的代码表示就是:packet = Ether() / IP() / TCP() / Raw()

需要注意的是我们不单要定制每一层的数据,还要维护每一层之间的关系。比如:在构造 HTTP 数据时,如果 HTTP 数据量较大时,可能需要我们手动进行“分片”,因为 HTTP 数据使用 TCP 协议发送,所以会受到 MSS 和 MTU 影响,当单个 HTTP 请求或者响应超过 MSS 时需要我们在 TCP 层分包。同时在构造 HTTP 层的数据时,还要注意 HTTP1 协议的一些细节,比如 Content-Length 的计算,需要计算 请求体/响应体 字节串的长度。以及 HTTP 请求头中的 Host 字段中的端口需要和 IP 层中指定数据包的目的端口一致,这样才不至于被认为是错误的数据包。

HTTP1 协议使用 TCP 协议传输数据,所以本质上我们需要构造 TCP 协议的数据包来模拟 HTTP1 协议。因为我们需要自定义 HTTP 的请求和响应,并不是使用 Scapy 进行真实的网络请求,所以每个 TCP 包之间的关系也需要我们自己维护,其实就是要模拟真实的 TCP 连接,即维护 TCP 包的头部信息,如标志位、seq/ack 号。

总结一下我们需要维护的几点:

  1. 维护 HTTP 协议本身的格式,需要注意一些需要计算的字段,如:Content-Type。
  2. 根据 MSS / MTU 对 HTTP 数据包切割后再作为 TCP 协议的 Body。
  3. HTTP 请求中 Host 字段中的端口信息与 IP 包中的端口信息保持一致。
  4. 维护 TCP 包中的标志位、seq/ack 号来维护模拟 TCP 连接(如果这里的数据有差错将生成错误的包,打开 Wireshark 查看将看到被标记为黑色)。

使用 Scapy 构造上一章节中举例的 HTTP 连接:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
from scapy.utils import wrpcap
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP
from scapy.layers.http import Raw

def gen_http_pcap():
    """生成 HTTP Pcap"""
    sip = "10.67.0.63"
    dip = "10.67.0.1"
    sport = 8080
    dport = 80
    smac = "fe:2d:90:f2:11:fe"
    dmac = "12:dd:2c:21:a2:bf"
    request = "GET / HTTP/1.1\r\n" "Host: example.com\r\n" "\r\n"
    response = "HTTP/1.1 200 OK\r\n" "Content-Length: 5\r\n" "Content-Type: text/plain\r\n" "\r\n" "Hello"
    seq_x = 0
    seq_y = 0

    # 通用的 IP 层包
    sip_packet = Ether(src=smac, dst=dmac) / IP(src=sip, dst=dip)
    dip_packet = Ether(src=dmac, dst=smac) / IP(src=dip, dst=sip)

    client_port_pair = {"sport": sport, "dport": dport}
    server_port_pair = {"sport": dport, "dport": sport}

    # 三次握手: SYN, SYN-ACK, ACK
    syn_packet = sip_packet / TCP(**client_port_pair, flags="S", seq=seq_x)
    syn_ack_packet = dip_packet / TCP(**server_port_pair, flags="SA", seq=seq_y, ack=syn_packet[TCP].seq + 1)
    ack_packet = sip_packet / TCP(
        **client_port_pair, flags="A", seq=syn_ack_packet[TCP].ack, ack=syn_ack_packet[TCP].seq + 1
    )

    # HTTP 请求
    request_packet = (
        sip_packet
        / TCP(
            **client_port_pair,
            flags="PA",
            seq=syn_ack_packet[TCP].ack,
            ack=syn_ack_packet[TCP].seq + 1,
        )
        / Raw(request.encode())
    )
    dst_ack_packet = dip_packet / TCP(
        **server_port_pair,
        flags="A",
        seq=request_packet[TCP].ack,
        ack=request_packet[TCP].seq + len(request_packet[Raw].load),
    )
    # HTTP 响应
    response_packet = (
        dip_packet
        / TCP(
            **server_port_pair,
            flags="PA",
            seq=request_packet[TCP].ack,
            ack=request_packet[TCP].seq + len(request_packet[Raw].load),
        )
        / Raw(response.encode())
    )
    src_ack_packet = sip_packet / TCP(
        **client_port_pair,
        flags="A",
        seq=response_packet[TCP].ack,
        ack=response_packet[TCP].seq + len(response_packet[Raw].load),
    )

    # 四次挥手
    fin_packet = sip_packet / TCP(
        **client_port_pair, flags="FA", seq=src_ack_packet[TCP].seq, ack=src_ack_packet[TCP].ack
    )
    ack_packet_close = dip_packet / TCP(
        **server_port_pair, flags="A", seq=fin_packet[TCP].ack, ack=fin_packet[TCP].seq + 1
    )
    ack_packet_close2 = dip_packet / TCP(
        **server_port_pair, flags="FA", seq=fin_packet[TCP].ack, ack=fin_packet[TCP].seq + 1
    )
    fin_packet_ack = sip_packet / TCP(
        **client_port_pair, flags="A", seq=ack_packet_close2[TCP].ack, ack=ack_packet_close2[TCP].seq + 1
    )

    # 按顺序组合数据包
    http_traffic = [
        # 三次握手
        syn_packet,
        syn_ack_packet,
        ack_packet,
        # 数据交换
        request_packet,
        dst_ack_packet,
        response_packet,
        src_ack_packet,
        # 四次挥手
        fin_packet,
        ack_packet_close,
        ack_packet_close2,
        fin_packet_ack,
    ]

    wrpcap(f"http.pcap", http_traffic)

if __name__ == "__main__":
    gen_http_pcap()

使用 Wireshark 查看生成的数据包

HTTP1.* 样例 Pcap

六、工具的封装和使用

上述的代码封装为生成 HTTP1.* Pcap 的工具脚本,整个过程其实可以抽象为两个部分

  1. 封装生成 HTTP1.* 请求和响应的逻辑
  2. 封装模拟 TCP 传输数据生成相关网络包的逻辑

这一部分代码就不再展开介绍了,完整工具代码见 Github Gist,其实就是对上一章节中的样例代码进一步封装抽象。

下面简单介绍一下上述提供的工具脚本的用法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import random

from scapy.utils import wrpcap

from gen_http_pcap import gen_http_request, gen_http_response, gen_http_pcap

dip, dport = "10.67.2.42", 80
sip, sport = "10.67.0.63", 9000


def gen_vul_800001():
    """
    desc: 测试弱密码
    event: 800001
    """
    # 模拟登录
    url = "/auth/login/"
    req_body = {"username": "admin", "password": "qaz123456"}
    res_header = {"Set-Cookie": "sessionid=94hi2otc9ykaoufja1ailul871tf4ha0; "
                                "expires=Fri, 19 Sep 2025 04:40:28 GMT; HttpOnly; Max-Age=3600; Path=/; SameSite=Lax"}
    res_body = {"status": "200", "message": "登陆成功", "data": None}
    request = gen_http_request(dip, dport, url, body=req_body, method="POST")
    response = gen_http_response(header=res_header, body=res_body)

    # 模拟获取数据
    url1 = "/user/admin/"
    req_head1 = {"Cookie": "sessionid=94hi2otc9ykaoufja1ailul871tf4ha0"}
    res_body1 = {"status": "200", "message": "获取用户信息成功", "data": {"username": "admin", "role": "admin"}}
    request1 = gen_http_request(dip, dport, url1, header=req_head1)
    response1 = gen_http_response(body=res_body1)

    return gen_http_pcap(src_ip=sip,
                         src_port=random.randint(20000, 50000),
                         dst_ip=dip,
                         dst_port=dport,
                         request=[request, request1],
                         response=[response, response1])


if __name__ == '__main__':
    packets = gen_vul_800001()
    wrpcap(f"800001.pcap", packets)

使用 Wireshark 查看生成的数据包

弱密码登录 Pcap

使用 Wireshark 追踪 HTTP 流

七、更多

  1. 需要注意的是当前自己生成 Pcap 包并不能完全替代搭建靶场抓包的方式,因为构造的 Pcap 包中的数据毕竟是我们自己输入的,相较于真实的环境可能真实性并没有实际抓包的好。当然如果我们输入的数据与真实的数据一致,那也便没什么区别了。
  2. 只要知道一个协议的底层细节,我们便能通过 Scapy 进行模拟。故无论是 HTTP2.0、HTTP3.0 还是 TLS 都能通过 Scapy 构造,不同于 HTTP1.0 上述协议本身更加复杂,故使用 Scapy 模拟时将更为复杂。感兴趣的读者也可以自己尝试一下。不过话又说回来了,如果从底层掌握了网络细节,就算一个个字节拼接也能还原协议哈哈哈。
Licensed under CC BY-NC-SA 4.0
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy