blob: 642d5c2da4b8b7f21a6fc399f7df9374f453dcc0 [file] [log] [blame]
import oftest.base_tests as base_tests
import oftest.packet as scapy
import ofp
import time
class OltBaseTest(base_tests.SimpleDataPlane):
def installDoubleTaggingRules(s_vlan_id, c_vlan_id, cookie=42):
# upstream flow rule
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(onu_port))
if device_type == "cpqd":
match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_NONE))
actions = [
ofp.action.push_vlan(ethertype=0x8100),
ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)),
ofp.action.set_field(ofp.oxm.vlan_pcp(0))
]
else: # "pmc", "normal"
match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_PRESENT))
match.oxm_list.append(ofp.oxm.vlan_pcp(value=0))
actions = [
ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id))
]
cookie += 1
# push inner vlan (c-vlan) for upstream
request = ofp.message.flow_add(
table_id=test_param_get("table", 0),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(actions=actions),
ofp.instruction.goto_table(1)],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)
# push outer vlan (s-vlan) for upstream
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(onu_port))
match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id))
match.oxm_list.append(ofp.oxm.vlan_pcp(0))
cookie += 1
request = ofp.message.flow_add(
table_id=test_param_get("table", 1),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(
actions=[
ofp.action.push_vlan(ethertype=0x8100),
ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id)),
ofp.action.set_field(ofp.oxm.vlan_pcp(0)),
ofp.action.output(port=olt_port)]),
],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)
cookie += 1
# strip outer vlan (s-vlan) for downstream
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(olt_port))
match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id))
match.oxm_list.append(ofp.oxm.vlan_pcp(0))
request = ofp.message.flow_add(
table_id=test_param_get("table", 0),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(
actions=[ofp.action.pop_vlan()]),
ofp.instruction.goto_table(1)],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)
# rewrite inner vlan (c-vlan) to default (0) for downstream
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(olt_port))
match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id))
match.oxm_list.append(ofp.oxm.vlan_pcp(0))
cookie += 1
request = ofp.message.flow_add(
table_id=test_param_get("table", 1),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(
actions=[
ofp.action.pop_vlan(),
ofp.action.output(port=onu_port)])
],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)