| import oftest.base_tests as base_tests |
| import oftest.packet as scapy |
| |
| import ofp |
| import time |
| |
| |
| class OltBaseTest(base_tests.SimpleDataPlane): |
| |
| def installDoubleTaggingRules(s_vlan_id, c_vlan_id, cookie=42): |
| |
| # upstream flow rule |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(onu_port)) |
| if device_type == "cpqd": |
| match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_NONE)) |
| actions = [ |
| ofp.action.push_vlan(ethertype=0x8100), |
| ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)), |
| ofp.action.set_field(ofp.oxm.vlan_pcp(0)) |
| ] |
| else: # "pmc", "normal" |
| match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_PRESENT)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(value=0)) |
| actions = [ |
| ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)) |
| ] |
| cookie += 1 |
| |
| # push inner vlan (c-vlan) for upstream |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 0), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions(actions=actions), |
| ofp.instruction.goto_table(1)], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |
| |
| # push outer vlan (s-vlan) for upstream |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(onu_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(0)) |
| cookie += 1 |
| |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 1), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ |
| ofp.action.push_vlan(ethertype=0x8100), |
| ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id)), |
| ofp.action.set_field(ofp.oxm.vlan_pcp(0)), |
| ofp.action.output(port=olt_port)]), |
| ], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |
| cookie += 1 |
| |
| # strip outer vlan (s-vlan) for downstream |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(olt_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(0)) |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 0), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ofp.action.pop_vlan()]), |
| ofp.instruction.goto_table(1)], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |
| |
| # rewrite inner vlan (c-vlan) to default (0) for downstream |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(olt_port)) |
| match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)) |
| match.oxm_list.append(ofp.oxm.vlan_pcp(0)) |
| cookie += 1 |
| |
| request = ofp.message.flow_add( |
| table_id=test_param_get("table", 1), |
| cookie=cookie, |
| match=match, |
| instructions=[ |
| ofp.instruction.apply_actions( |
| actions=[ |
| ofp.action.pop_vlan(), |
| ofp.action.output(port=onu_port)]) |
| ], |
| buffer_id=ofp.OFP_NO_BUFFER, |
| priority=1000) |
| |
| self.controller.message_send(request) |
| do_barrier(self.controller) |
| verify_no_errors(self.controller) |