blob: 29a6db5c6ceed6b194dec7a7ce4b01f080e938f9 [file] [log] [blame]
Zsolt Haraszti5360d032016-03-02 23:02:09 -08001import logging
2from oftest.testutils import test_param_get, MINSIZE
alshabibcde318b2016-03-01 22:49:13 -08003import oftest.packet as scapy
4from oftest import config
5
6import ofp
7
8# These parameters can be altered from the command line using the -t or --test-params= options.
9# Example: -t 'onu_port=129;olt_port=288;device_type=pmc'
10#
11onu_port = test_param_get("onu_port", 130)
12onu_port2 = test_param_get("onu_port2", 131)
13olt_port = test_param_get("olt_port", 258)
14device_type = test_param_get("device_type", "normal") # options: "normal", "pmc", "cpqd"
Zsolt Haraszti5360d032016-03-02 23:02:09 -080015fake_dataplane = test_param_get("fake_dataplane", False)
alshabibcde318b2016-03-01 22:49:13 -080016
17def createAllGroupAdd(group_id, ports=[]):
18 buckets = []
19
20 for portNum in ports:
21 buckets.append(ofp.common.bucket(watch_port=ofp.OFPP_ANY, watch_group=ofp.OFPG_ANY,
Adminb3bae672016-03-02 13:00:08 -080022 actions=[ofp.action.output(port=portNum)]))
alshabibcde318b2016-03-01 22:49:13 -080023
24 group_add = ofp.message.group_add(group_type=ofp.OFPGT_ALL, group_id=group_id, buckets=buckets)
25
26 return group_add
27
28
29def createAllGroupMod(group_id, ports=[]):
30 buckets = []
31
32 for portNum in ports:
33 buckets.append(ofp.common.bucket(watch_port=ofp.OFPP_ANY, watch_group=ofp.OFPG_ANY,
34 actions=[ofp.action.output(port=portNum)]))
35
36 group_mod = ofp.message.group_mod(command=ofp.OFPGC_MODIFY, group_type=ofp.OFPGT_ALL, group_id=group_id,
37 buckets=buckets)
38
39 return group_mod
40
alshabibcde318b2016-03-01 22:49:13 -080041def double_vlan_udp_packet(pktlen=100,
42 eth_dst='00:01:02:03:04:05',
43 eth_src='00:06:07:08:09:0a',
44 dl_vlan_enable=False,
45 c_vlan_vid=0,
46 c_vlan_pcp=0,
47 s_vlan_vid=0,
48 s_vlan_pcp=0,
49 ip_src='192.168.0.1',
50 ip_dst='192.168.0.2',
51 ip_tos=0,
52 ip_ttl=64,
53 udp_sport=1234,
54 udp_dport=80,
55 ip_ihl=None,
56 ip_options=False,
57 eth_type=0x8100
58 ):
59 """
60 Return a double vlan tagged dataplane UDP packet
61 Supports a few parameters:
62 @param len Length of packet in bytes w/o CRC
63 @param eth_dst Destination MAC
64 @param eth_src Source MAC
65 @param dl_vlan_enable True if the packet is with vlan, False otherwise
66 @param c_vlan_vid CVLAN ID
67 @param c_vlan_pcp CVLAN priority
68 @param s_vlan_vid SVLAN ID
69 @param s_vlan_pcp SVLAN priority
70 @param ip_src IP source
71 @param ip_dst IP destination
72 @param ip_tos IP ToS
73 @param ip_ttl IP TTL
74 @param udp_dport UDP destination port
75 @param udp_sport UDP source port
76
77 Generates a simple UDP packet. Users shouldn't assume anything about
78 this packet other than that it is a valid ethernet/IP/UDP frame.
79 """
80
81 if MINSIZE > pktlen:
82 pktlen = MINSIZE
83
84 # Note Dot1Q.id is really CFI
85 if (dl_vlan_enable):
86 pkt = scapy.Ether(dst=eth_dst, src=eth_src, type=eth_type) / \
87 scapy.Dot1Q(prio=s_vlan_pcp, vlan=s_vlan_vid) / \
88 scapy.Dot1Q(prio=c_vlan_pcp, vlan=c_vlan_vid) / \
89 scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl) / \
90 scapy.UDP(sport=udp_sport, dport=udp_dport)
91 else:
92 if not ip_options:
93 pkt = scapy.Ether(dst=eth_dst, src=eth_src)/ \
94 scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl)/ \
95 scapy.UDP(sport=udp_sport, dport=udp_dport)
96
97 else:
98 pkt = scapy.Ether(dst=eth_dst, src=eth_src) / \
99 scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ttl=ip_ttl, ihl=ip_ihl, options=ip_options) / \
100 scapy.UDP(sport=udp_sport, dport=udp_dport)
101
102 pkt = pkt / ("D" * (pktlen - len(pkt)))
103
Adminb3bae672016-03-02 13:00:08 -0800104 return pkt
Zsolt Haraszti5360d032016-03-02 23:02:09 -0800105
106# Monkey-patch the oftest library to allow faking the data-plane
107# This mode allows oftest to glide through the OF programming steps while
108# passing all packet receiving tests. This is useful when you want to
109# verify that your agent's OF interface handles all OF protocol messages
110# from the controller, but in the absence of actual data-plane tests.
111# In order to achieve this, in this mode the following chnages are made:
112# - All verify_packet_in() tests pass
113if fake_dataplane:
114 logging.info('Monkey-patching for fake-dataplane operations')
115
116 from oftest import testutils
117
118 def patched_verify_packet_in(test, data, in_port, reason, controller=None):
119 logging.debug('Faking verify_packet_in')
120 msg = ofp.message.packet_in(reason=reason, data=data)
121 return msg
122
123 testutils.verify_packet_in = patched_verify_packet_in
124
125 def patched_verify_packet(test, pkt, of_port):
126 logging.debug('Faking verify_packet')
127 return
128
129 testutils.verify_packet = patched_verify_packet