blob: b058efc7b1a476bf71ecbfc788866e522980ac80 [file] [log] [blame]
import oftest.base_tests as base_tests
import oftest.packet as scapy
from oftest import config
import ofp
import time
from oftest.testutils import *
# These parameters can be altered from the command line using the -t or --test-params= options.
# Example: -t 'onu_port=129;olt_port=288;device_type=pmc'
#
onu_port = test_param_get("onu_port", 130)
onu_port2 = test_param_get("onu_port2", 131)
olt_port = test_param_get("olt_port", 258)
device_type = test_param_get("device_type", "normal") # options: "normal", "pmc", "cpqd"
class OltBaseTest(base_tests.SimpleDataPlane):
def installDoubleTaggingRules(self, s_vlan_id, c_vlan_id, cookie=42):
# upstream flow rule
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(onu_port))
if device_type == "cpqd":
match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_NONE))
actions = [
ofp.action.push_vlan(ethertype=0x8100),
ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id)),
ofp.action.set_field(ofp.oxm.vlan_pcp(0))
]
else: # "pmc", "normal"
match.oxm_list.append(ofp.oxm.vlan_vid(value=ofp.OFPVID_PRESENT))
match.oxm_list.append(ofp.oxm.vlan_pcp(value=0))
actions = [
ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id))
]
cookie += 1
# push inner vlan (c-vlan) for upstream
request = ofp.message.flow_add(
table_id=test_param_get("table", 0),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(actions=actions),
ofp.instruction.goto_table(1)],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)
# push outer vlan (s-vlan) for upstream
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(onu_port))
match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id))
match.oxm_list.append(ofp.oxm.vlan_pcp(0))
cookie += 1
request = ofp.message.flow_add(
table_id=test_param_get("table", 1),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(
actions=[
ofp.action.push_vlan(ethertype=0x8100),
ofp.action.set_field(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id)),
ofp.action.set_field(ofp.oxm.vlan_pcp(0)),
ofp.action.output(port=olt_port)]),
],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)
cookie += 1
# strip outer vlan (s-vlan) for downstream
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(olt_port))
match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | s_vlan_id))
match.oxm_list.append(ofp.oxm.vlan_pcp(0))
request = ofp.message.flow_add(
table_id=test_param_get("table", 0),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(
actions=[ofp.action.pop_vlan()]),
ofp.instruction.goto_table(1)],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)
# rewrite inner vlan (c-vlan) to default (0) for downstream
match = ofp.match()
match.oxm_list.append(ofp.oxm.in_port(olt_port))
match.oxm_list.append(ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT | c_vlan_id))
match.oxm_list.append(ofp.oxm.vlan_pcp(0))
cookie += 1
request = ofp.message.flow_add(
table_id=test_param_get("table", 1),
cookie=cookie,
match=match,
instructions=[
ofp.instruction.apply_actions(
actions=[
ofp.action.pop_vlan(),
ofp.action.output(port=onu_port)])
],
buffer_id=ofp.OFP_NO_BUFFER,
priority=1000)
self.controller.message_send(request)
do_barrier(self.controller)
verify_no_errors(self.controller)