adding optional onu port
diff --git a/oltbase.py b/oltbase.py
index ba72e8e..be7f0af 100644
--- a/oltbase.py
+++ b/oltbase.py
@@ -33,11 +33,11 @@
self.controller.message_send(request)
do_barrier(self.controller)
- for of_port in config["port_map"]:
- logging.info("PacketInExact test, port %d", of_port)
- self.dataplane.send(of_port, pkt)
- verify_packet_in(self, pkt, of_port, ofp.OFPR_ACTION)
- verify_packets(self, pkt, [])
+ for of_port in config["port_map"]:
+ logging.info("PacketInExact test, port %d", of_port)
+ self.dataplane.send(of_port, pkt)
+ verify_packet_in(self, pkt, of_port, ofp.OFPR_ACTION)
+ verify_packets(self, pkt, [])
def processEapolRule(self, in_port, install = True):
match = ofp.match()
@@ -73,7 +73,7 @@
self.controller.message_send(request)
do_barrier(self.controller)
- def testPacketFlow(self, c_vlan_id, s_vlan_id):
+ def testPacketFlow(self, c_vlan_id, s_vlan_id, onu = None):
incorrectTagPkt = simple_udp_packet(pktlen=100, dl_vlan_enable=True, vlan_vid=100, vlan_pcp=1)
zeroTaggedPkt = simple_udp_packet(pktlen=100, dl_vlan_enable=True, vlan_vid=0, vlan_pcp=0)
@@ -84,36 +84,40 @@
s_vlan_vid=s_vlan_id,
c_vlan_pcp=0, s_vlan_pcp=0)
+ inport = onu_port if onu is None else onu
+
logging.info("Testing s-tag %d, c-tag %d" % (s_vlan_id, c_vlan_id))
# test upstream untagged packet got double tag at OLT
- self.dataplane.send(onu_port, str(zeroTaggedPkt))
+ self.dataplane.send(inport, str(zeroTaggedPkt))
verify_packet(self, upstreamDoubleTaggedPkt, olt_port)
# test downstream doubletagged packet got untagged at ONU
self.dataplane.send(olt_port, str(upstreamDoubleTaggedPkt))
if device_type == "pmc":
- verify_packet(self, zeroTaggedPkt, onu_port)
+ verify_packet(self, zeroTaggedPkt, inport)
else:
- verify_packet(self, untaggedPkt, onu_port)
+ verify_packet(self, untaggedPkt, inport)
# test upstream doubletagged packet got dropped
- self.dataplane.send(onu_port, str(upstreamDoubleTaggedPkt))
+ self.dataplane.send(inport, str(upstreamDoubleTaggedPkt))
verify_no_packet(self, upstreamDoubleTaggedPkt, olt_port)
# test downstream untagged packet got dropped at ONU
self.dataplane.send(olt_port, str(untaggedPkt))
- verify_no_packet(self, untaggedPkt, onu_port)
+ verify_no_packet(self, untaggedPkt, inport)
# test upstream icorrectly tagged packet; should get dropped
- self.dataplane.send(onu_port, str(incorrectTagPkt))
+ self.dataplane.send(inport, str(incorrectTagPkt))
verify_no_packet(self, upstreamDoubleTaggedPkt, olt_port)
- def installDoubleTaggingRules(self, s_vlan_id, c_vlan_id, cookie=42):
+ def installDoubleTaggingRules(self, s_vlan_id, c_vlan_id, cookie=42, onu = None):
+
+ inport = onu_port if onu is None else onu
# upstream flow rule
match = ofp.match()
- match.oxm_list.append(ofp.oxm.in_port(onu_port))
+ match.oxm_list.append(ofp.oxm.in_port(inport))
if device_type == "cpqd":
match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_NONE))
actions = [
@@ -146,7 +150,7 @@
# push outer vlan (s-vlan) for upstream
match = ofp.match()
- match.oxm_list.append(ofp.oxm.in_port(onu_port))
+ match.oxm_list.append(ofp.oxm.in_port(inport))
match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id))
match.oxm_list.append(ofp.oxm.vlan_pcp(0))
cookie += 1
@@ -206,7 +210,7 @@
ofp.instruction.apply_actions(
actions=[
ofp.action.pop_vlan(),
- ofp.action.output(port=onu_port)])
+ ofp.action.output(port=inport)])
],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)