blob: 80546f400364652b7fb6cde3e7b3897b84a1afcd [file] [log] [blame]
from scapy.contrib.gtp import GTP_U_Header, GTPPDUSessionContainer
from scapy.layers.inet import IP, UDP
from scapy.layers.l2 import Ether
import codecs
UDP_GTP_PORT = 2152
DEFAULT_GTP_TUNNEL_SPORT = 1234 # arbitrary, but different from 2152
IP_HDR_BYTES = 20
UDP_HDR_BYTES = 8
GTPU_HDR_BYTES = 8
GTPU_OPTIONS_HDR_BYTES = 4
GTPU_EXT_PSC_BYTES = 4
def simple_gtp_udp_packet(
eth_dst=None,
eth_src=None,
ip_src="192.168.0.1",
ip_dst="192.168.0.2",
s1u_addr="100.0.0.1",
enb_addr="192.168.101.1",
ip_ttl=64,
gtp_teid=0xFF, # dummy teid
pktlen=136,
ext_psc_type=None,
ext_psc_qfi=0,
):
pktlen = pktlen - IP_HDR_BYTES - UDP_HDR_BYTES - GTPU_HDR_BYTES
if ext_psc_type is not None:
pktlen = pktlen - GTPU_OPTIONS_HDR_BYTES - GTPU_EXT_PSC_BYTES
pkt = simple_udp_packet(eth_src=eth_src, eth_dst=eth_dst, ip_src=ip_src,
ip_dst=ip_dst, pktlen=pktlen)
gtp_pkt = pkt_add_gtp(
pkt,
out_ipv4_src=enb_addr,
out_ipv4_dst=s1u_addr,
teid=gtp_teid,
ext_psc_type=ext_psc_type,
ext_psc_qfi=ext_psc_qfi,
)
gtp_pkt[Ether].src = eth_src
gtp_pkt[Ether].dst = eth_dst
gtp_pkt[IP].ttl = ip_ttl
return gtp_pkt
def pkt_add_gtp(
pkt,
out_ipv4_src,
out_ipv4_dst,
teid,
sport=DEFAULT_GTP_TUNNEL_SPORT,
dport=UDP_GTP_PORT,
ext_psc_type=None,
ext_psc_qfi=None,
):
gtp_pkt = (
Ether(src=pkt[Ether].src, dst=pkt[Ether].dst)
/ IP(src=out_ipv4_src, dst=out_ipv4_dst, tos=0, id=0x1513, flags=0,
frag=0, )
/ UDP(sport=sport, dport=dport, chksum=0)
/ GTP_U_Header(gtp_type=255, teid=teid)
)
if ext_psc_type is not None:
# Add QoS Flow Identifier (QFI) as an extension header (required for 5G RAN)
gtp_pkt = gtp_pkt / GTPPDUSessionContainer(type=ext_psc_type,
QFI=ext_psc_qfi)
return gtp_pkt / pkt[Ether].payload
# Simplified version of simple_udp_packet from https://github.com/p4lang/ptf/blob/master/src/ptf/testutils.py
def simple_udp_packet(
pktlen=100,
eth_dst="00:01:02:03:04:05",
eth_src="00:06:07:08:09:0a",
ip_src="192.168.0.1",
ip_dst="192.168.0.2",
udp_sport=1234,
udp_dport=80,
udp_payload=None,
):
pkt = Ether(src=eth_src, dst=eth_dst) / IP(src=ip_src, dst=ip_dst) / UDP(
sport=udp_sport, dport=udp_dport)
if udp_payload:
pkt = pkt / udp_payload
return pkt / codecs.decode(
"".join(["%02x" % (x % 256) for x in range(pktlen - len(pkt))]), "hex")