blob: f9e049e86313d1e9f6001ad44734b1d9441f9a54 [file] [log] [blame]
Daniele Moro80889562021-09-08 10:09:26 +02001from scapy.contrib.gtp import GTP_U_Header, GTPPDUSessionContainer
2from scapy.layers.inet import IP, UDP
3from scapy.layers.l2 import Ether
4import codecs
5
6UDP_GTP_PORT = 2152
7DEFAULT_GTP_TUNNEL_SPORT = 1234 # arbitrary, but different from 2152
8
9IP_HDR_BYTES = 20
10UDP_HDR_BYTES = 8
11GTPU_HDR_BYTES = 8
12GTPU_OPTIONS_HDR_BYTES = 4
13GTPU_EXT_PSC_BYTES = 4
14
15
16def simple_gtp_udp_packet(
17 eth_dst=None,
18 eth_src=None,
19 ip_src="192.168.0.1",
20 ip_dst="192.168.0.2",
Daniele Morob8404e82022-02-25 00:17:28 +010021 udp_sport=1234,
22 udp_dport=80,
Daniele Moro80889562021-09-08 10:09:26 +020023 s1u_addr="100.0.0.1",
24 enb_addr="192.168.101.1",
25 ip_ttl=64,
26 gtp_teid=0xFF, # dummy teid
27 pktlen=136,
28 ext_psc_type=None,
29 ext_psc_qfi=0,
30):
31 pktlen = pktlen - IP_HDR_BYTES - UDP_HDR_BYTES - GTPU_HDR_BYTES
32 if ext_psc_type is not None:
33 pktlen = pktlen - GTPU_OPTIONS_HDR_BYTES - GTPU_EXT_PSC_BYTES
34 pkt = simple_udp_packet(eth_src=eth_src, eth_dst=eth_dst, ip_src=ip_src,
Daniele Morob8404e82022-02-25 00:17:28 +010035 ip_dst=ip_dst, udp_sport=udp_sport, udp_dport=udp_dport,
36 pktlen=pktlen)
Daniele Moro80889562021-09-08 10:09:26 +020037 gtp_pkt = pkt_add_gtp(
38 pkt,
39 out_ipv4_src=enb_addr,
40 out_ipv4_dst=s1u_addr,
41 teid=gtp_teid,
42 ext_psc_type=ext_psc_type,
43 ext_psc_qfi=ext_psc_qfi,
44 )
45 gtp_pkt[Ether].src = eth_src
46 gtp_pkt[Ether].dst = eth_dst
47 gtp_pkt[IP].ttl = ip_ttl
48 return gtp_pkt
49
50
51def pkt_add_gtp(
52 pkt,
53 out_ipv4_src,
54 out_ipv4_dst,
55 teid,
56 sport=DEFAULT_GTP_TUNNEL_SPORT,
57 dport=UDP_GTP_PORT,
58 ext_psc_type=None,
59 ext_psc_qfi=None,
60):
61 gtp_pkt = (
62 Ether(src=pkt[Ether].src, dst=pkt[Ether].dst)
63 / IP(src=out_ipv4_src, dst=out_ipv4_dst, tos=0, id=0x1513, flags=0,
64 frag=0, )
65 / UDP(sport=sport, dport=dport, chksum=0)
66 / GTP_U_Header(gtp_type=255, teid=teid)
67 )
68 if ext_psc_type is not None:
69 # Add QoS Flow Identifier (QFI) as an extension header (required for 5G RAN)
70 gtp_pkt = gtp_pkt / GTPPDUSessionContainer(type=ext_psc_type,
71 QFI=ext_psc_qfi)
72 return gtp_pkt / pkt[Ether].payload
73
74
75# Simplified version of simple_udp_packet from https://github.com/p4lang/ptf/blob/master/src/ptf/testutils.py
76def simple_udp_packet(
77 pktlen=100,
78 eth_dst="00:01:02:03:04:05",
79 eth_src="00:06:07:08:09:0a",
80 ip_src="192.168.0.1",
81 ip_dst="192.168.0.2",
82 udp_sport=1234,
83 udp_dport=80,
84 udp_payload=None,
85):
86 pkt = Ether(src=eth_src, dst=eth_dst) / IP(src=ip_src, dst=ip_dst) / UDP(
87 sport=udp_sport, dport=udp_dport)
88 if udp_payload:
89 pkt = pkt / udp_payload
90 return pkt / codecs.decode(
91 "".join(["%02x" % (x % 256) for x in range(pktlen - len(pkt))]), "hex")