| import oftest.base_tests as base_tests |
| import oftest.packet as scapy |
| from oftest import config |
| |
| import ofp |
| import time |
| |
| from oftest.testutils import * |
| |
| # These parameters can be altered from the command line using the -t or --test-params= options. |
| # Example: -t 'onu_port=129;olt_port=288;device_type=pmc' |
| # |
| onu_port = test_param_get("onu_port", 130) |
| onu_port2 = test_param_get("onu_port2", 131) |
| olt_port = test_param_get("olt_port", 258) |
| device_type = test_param_get("device_type", "normal") # options: "normal", "pmc", "cpqd" |
| class OltBaseTest(base_tests.SimpleDataPlane): |
| |
| def installDoubleTaggingRules(self, s_vlan_id, c_vlan_id, cookie=42): |
| |
| # upstream flow rule |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(onu_port)) |
| if device_type == "cpqd": |
| match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_NONE)) |
| actions = [ |
| ofp.action.push_vlan(ethertype=0x8100), |
| ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)), |
| ofp.action.set_field(ofp.oxm.vlan_pcp(0)) |
| ] |
| else: # "pmc", "normal" |
| match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_PRESENT)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(value=0)) |
| actions = [ |
| ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)) |
| ] |
| cookie += 1 |
| |
| # push inner vlan (c-vlan) for upstream |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 0), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions(actions=actions), |
| ofp.instruction.goto_table(1)], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |
| |
| # push outer vlan (s-vlan) for upstream |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(onu_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(0)) |
| cookie += 1 |
| |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 1), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ |
| ofp.action.push_vlan(ethertype=0x8100), |
| ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id)), |
| ofp.action.set_field(ofp.oxm.vlan_pcp(0)), |
| ofp.action.output(port=olt_port)]), |
| ], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |
| cookie += 1 |
| |
| # strip outer vlan (s-vlan) for downstream |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(olt_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(0)) |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 0), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ofp.action.pop_vlan()]), |
| ofp.instruction.goto_table(1)], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |
| |
| # rewrite inner vlan (c-vlan) to default (0) for downstream |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(olt_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(0)) |
| cookie += 1 |
| |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 1), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ |
| ofp.action.pop_vlan(), |
| ofp.action.output(port=onu_port)]) |
| ], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |