blob: 80546f400364652b7fb6cde3e7b3897b84a1afcd [file] [log] [blame]
Daniele Moro80889562021-09-08 10:09:26 +02001from scapy.contrib.gtp import GTP_U_Header, GTPPDUSessionContainer
2from scapy.layers.inet import IP, UDP
3from scapy.layers.l2 import Ether
4import codecs
5
6UDP_GTP_PORT = 2152
7DEFAULT_GTP_TUNNEL_SPORT = 1234 # arbitrary, but different from 2152
8
9IP_HDR_BYTES = 20
10UDP_HDR_BYTES = 8
11GTPU_HDR_BYTES = 8
12GTPU_OPTIONS_HDR_BYTES = 4
13GTPU_EXT_PSC_BYTES = 4
14
15
16def simple_gtp_udp_packet(
17 eth_dst=None,
18 eth_src=None,
19 ip_src="192.168.0.1",
20 ip_dst="192.168.0.2",
21 s1u_addr="100.0.0.1",
22 enb_addr="192.168.101.1",
23 ip_ttl=64,
24 gtp_teid=0xFF, # dummy teid
25 pktlen=136,
26 ext_psc_type=None,
27 ext_psc_qfi=0,
28):
29 pktlen = pktlen - IP_HDR_BYTES - UDP_HDR_BYTES - GTPU_HDR_BYTES
30 if ext_psc_type is not None:
31 pktlen = pktlen - GTPU_OPTIONS_HDR_BYTES - GTPU_EXT_PSC_BYTES
32 pkt = simple_udp_packet(eth_src=eth_src, eth_dst=eth_dst, ip_src=ip_src,
33 ip_dst=ip_dst, pktlen=pktlen)
34 gtp_pkt = pkt_add_gtp(
35 pkt,
36 out_ipv4_src=enb_addr,
37 out_ipv4_dst=s1u_addr,
38 teid=gtp_teid,
39 ext_psc_type=ext_psc_type,
40 ext_psc_qfi=ext_psc_qfi,
41 )
42 gtp_pkt[Ether].src = eth_src
43 gtp_pkt[Ether].dst = eth_dst
44 gtp_pkt[IP].ttl = ip_ttl
45 return gtp_pkt
46
47
48def pkt_add_gtp(
49 pkt,
50 out_ipv4_src,
51 out_ipv4_dst,
52 teid,
53 sport=DEFAULT_GTP_TUNNEL_SPORT,
54 dport=UDP_GTP_PORT,
55 ext_psc_type=None,
56 ext_psc_qfi=None,
57):
58 gtp_pkt = (
59 Ether(src=pkt[Ether].src, dst=pkt[Ether].dst)
60 / IP(src=out_ipv4_src, dst=out_ipv4_dst, tos=0, id=0x1513, flags=0,
61 frag=0, )
62 / UDP(sport=sport, dport=dport, chksum=0)
63 / GTP_U_Header(gtp_type=255, teid=teid)
64 )
65 if ext_psc_type is not None:
66 # Add QoS Flow Identifier (QFI) as an extension header (required for 5G RAN)
67 gtp_pkt = gtp_pkt / GTPPDUSessionContainer(type=ext_psc_type,
68 QFI=ext_psc_qfi)
69 return gtp_pkt / pkt[Ether].payload
70
71
72# Simplified version of simple_udp_packet from https://github.com/p4lang/ptf/blob/master/src/ptf/testutils.py
73def simple_udp_packet(
74 pktlen=100,
75 eth_dst="00:01:02:03:04:05",
76 eth_src="00:06:07:08:09:0a",
77 ip_src="192.168.0.1",
78 ip_dst="192.168.0.2",
79 udp_sport=1234,
80 udp_dport=80,
81 udp_payload=None,
82):
83 pkt = Ether(src=eth_src, dst=eth_dst) / IP(src=ip_src, dst=ip_dst) / UDP(
84 sport=udp_sport, dport=udp_dport)
85 if udp_payload:
86 pkt = pkt / udp_payload
87 return pkt / codecs.decode(
88 "".join(["%02x" % (x % 256) for x in range(pktlen - len(pkt))]), "hex")