| ''' |
| OFTests for functionality needed from the OLT. |
| ''' |
| import logging |
| from oftest import config |
| import oftest.base_tests as base_tests |
| import oftest.packet as scapy |
| |
| import ofp |
| |
| from oftest.testutils import * |
| |
| onu_port = test_param_get("onu_port", 1) |
| olt_port = test_param_get("olt_port", 129) |
| |
| def testPacketIn(self, match, parsed_pkt): |
| delete_all_flows(self.controller) |
| |
| pkt = str(parsed_pkt) |
| |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 0), |
| cookie=42, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ |
| ofp.action.output( |
| port=ofp.OFPP_CONTROLLER, |
| max_len=ofp.OFPCML_NO_BUFFER)])], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| logging.info("Inserting flow sending matching packets to controller") |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| |
| for of_port in config["port_map"].keys(): |
| logging.info("PacketInExact test, port %d", of_port) |
| self.dataplane.send(of_port, pkt) |
| verify_packet_in(self, pkt, of_port, ofp.OFPR_ACTION) |
| verify_packets(self, pkt, []) |
| |
| class EapolPacketIn(base_tests.SimpleDataPlane): |
| |
| """Verify packet-ins are sent for EAPOL packets """ |
| |
| def runTest(self): |
| logging.info("Running EAPOL Packet In test") |
| |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.eth_type(0x888e)) |
| # Use ethertype 0x888e and multicast destination MAC address |
| pkt = simple_eth_packet(pktlen=60, eth_dst='01:00:5E:7F:FF:FF', eth_type=0x888e) |
| |
| testPacketIn(self, match, pkt) |
| |
| class ARPPacketIn(base_tests.SimpleDataPlane): |
| |
| """Verify packet-ins are sent for ARP packets """ |
| |
| def runTest(self): |
| logging.info("Running ARP Packet In test") |
| |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.eth_type(0x0806)) |
| |
| # Use ethertype 0x0806 |
| pkt = simple_eth_packet(eth_type=0x0806) |
| |
| testPacketIn(self, match, pkt) |
| |
| class RadiusPacketIn(base_tests.SimpleDataPlane): |
| |
| """Verify packet-ins are sent for Radius packets """ |
| |
| def runTest(self): |
| logging.info("Running Radius Packet In test") |
| |
| pkt = simple_udp_packet(udp_sport=1812, udp_dport=1812) |
| |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.eth_type(0x0800)) |
| match.oxm_list.append(ofp.oxm.ip_proto(17)) |
| match.oxm_list.append(ofp.oxm.udp_src(1812)) |
| match.oxm_list.append(ofp.oxm.udp_dst(1812)) |
| |
| testPacketIn(self, match, pkt) |
| |
| # Other UDP packets should not match the rule |
| dhcpPkt = simple_udp_packet(udp_sport=68, udp_dport=67) |
| |
| self.dataplane.send(1, str(dhcpPkt)) |
| verify_no_packet_in(self, dhcpPkt, 1) |
| |
| |
| class DHCPPacketIn(base_tests.SimpleDataPlane): |
| |
| """Verify packet-ins are sent for DHCP packets """ |
| |
| def runTest(self): |
| logging.info("Running DHCP Packet In test") |
| |
| pkt = simple_udp_packet(udp_sport=68, udp_dport=67) |
| |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.eth_type(0x0800)) |
| match.oxm_list.append(ofp.oxm.ip_proto(17)) |
| match.oxm_list.append(ofp.oxm.udp_src(68)) |
| match.oxm_list.append(ofp.oxm.udp_dst(67)) |
| |
| testPacketIn(self, match, pkt) |
| |
| # Other UDP packets should not match the rule |
| radiusPkt = simple_udp_packet(udp_sport=1812, udp_dport=1812) |
| |
| self.dataplane.send(1, str(radiusPkt)) |
| verify_no_packet_in(self, radiusPkt, 1) |
| |
| class IGMPPacketIn(base_tests.SimpleDataPlane): |
| |
| """Verify packet-ins are sent for IGMP packets """ |
| |
| def runTest(self): |
| logging.info("Running IGMP Packet In test") |
| |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.eth_type(0x800)) |
| match.oxm_list.append(ofp.oxm.ip_proto(2)) |
| |
| pkt = scapy.Ether(dst='01:00:5E:7F:FF:FF', src='00:00:00:00:00:01')/ \ |
| scapy.IP(src='10.0.0.1', dst='10.0.0.2', ttl=60, tos=0, id=0, proto=2) |
| |
| pkt = pkt/("0" * (100 - len(pkt))) |
| |
| testPacketIn(self, match, pkt) |
| |
| class PushVlan(base_tests.SimpleDataPlane): |
| |
| """Verify the switch can push a VLAN tag and forward out a port """ |
| |
| def runTest(self): |
| logging.info("Running push VLAN test") |
| |
| vlan_id = 200 |
| |
| delete_all_flows(self.controller) |
| |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(onu_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFP_VLAN_NONE)) |
| |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 0), |
| cookie=42, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ |
| ofp.action.push_vlan(ethertype=0x8100), |
| ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | vlan_id)), |
| ofp.action.output(port=olt_port)])], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| logging.info("Inserting flow sending matching packets to controller") |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| |
| inPkt = simple_udp_packet() |
| outPkt = simple_udp_packet(pktlen=104, dl_vlan_enable=True, |
| vlan_vid=vlan_id, vlan_pcp=0, dl_vlan_cfi=0) |
| |
| # Send untagged packet in the ONU port and expect tagged packet out the OLT port |
| self.dataplane.send(onu_port, str(inPkt)) |
| verify_packet(self, outPkt, olt_port) |
| |
| # Send untagged packet in the OLT port and expect no packets to come out |
| self.dataplane.send(olt_port, str(inPkt)) |
| verify_packets(self, outPkt, []) |
| |
| class PopVlan(base_tests.SimpleDataPlane): |
| |
| """Verify the switch can pop a VLAN tag and forward out a port """ |
| |
| def runTest(self): |
| logging.info("Running pop VLAN test") |
| |
| vlan_id = 200 |
| |
| delete_all_flows(self.controller) |
| |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(olt_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | vlan_id)) |
| |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 0), |
| cookie=42, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ |
| ofp.action.pop_vlan(), |
| ofp.action.output(port=1)])], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| logging.info("Inserting flow sending matching packets to controller") |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| |
| inPkt = simple_udp_packet(pktlen=104, dl_vlan_enable=True, |
| vlan_vid=vlan_id, vlan_pcp=0, dl_vlan_cfi=0) |
| outPkt = simple_udp_packet() |
| |
| # Send tagged packet in the OLT port and expect untagged packet out the OLT port |
| self.dataplane.send(olt_port, str(inPkt)) |
| verify_packet(self, outPkt, onu_port) |
| |
| # Send tagged packet in the ONU port and expect no packets to come out |
| self.dataplane.send(onu_port, str(inPkt)) |
| verify_packets(self, outPkt, []) |