adding remove rule test
diff --git a/olt.py b/olt.py
index 8c89699..de98628 100644
--- a/olt.py
+++ b/olt.py
@@ -142,6 +142,10 @@
testPacketIn(self, match, pkt)
+
+
+
+
class IGMPPacketIn(base_tests.SimpleDataPlane):
"""Verify packet-ins are sent for IGMP packets """
@@ -766,7 +770,28 @@
def runTest(self):
logging.info("Testing Rule removal")
- pass
+ processEapolRule(self, onu_port)
+ #wait for the rule to settle
+ time.sleep(3)
+
+ installDoubleTaggingRule(1, 2, self.controller)
+ #wait for the rules to settle
+ time.sleep(3)
+
+ stats = get_flow_stats(self, ofp.match())
+
+ test.assertTrue(len(stats) == 3, \
+ "Wrong number of rules reports; reported %s, expected 3\n\n %s" % (len(stats), stats))
+
+ processEapolRule(self, onu_port, install = False)
+ time.sleep(3)
+
+ stats = get_flow_stats(self, ofp.match())
+
+ test.assertTrue(len(stats) == 2, \
+ "Wrong number of rules reports; reported %s, expected 2\n\n %s" % (len(stats), stats))
+
+ logging.info(stats)
class MultipleDoubleTaggingForwarding(base_tests.SimpleDataPlane):
@@ -812,6 +837,40 @@
time.sleep(5)
testPacketFlow(self, ctag, stag)
+def processEapolRule(test, in_port, install = True):
+ match = ofp.match()
+ match.oxm_list.append(ofp.oxm.eth_type(0x888e))
+ match.oxm_list.append(ofp.oxm.in_port(in_port))
+ if install:
+ request = ofp.message.flow_add(
+ table_id=test_param_get("table", 0),
+ cookie=42,
+ match=m,
+ instructions=[
+ ofp.instruction.apply_actions(
+ actions=[
+ ofp.action.output(
+ port=ofp.OFPP_CONTROLLER,
+ max_len=ofp.OFPCML_NO_BUFFER)])],
+ buffer_id=ofp.OFP_NO_BUFFER,
+ priority=1000)
+ else:
+ request = ofp.message.flow_delete(
+ table_id=test_param_get("table", 0),
+ cookie=42,
+ match=m,
+ instructions=[
+ ofp.instruction.apply_actions(
+ actions=[
+ ofp.action.output(
+ port=ofp.OFPP_CONTROLLER,
+ max_len=ofp.OFPCML_NO_BUFFER)])],
+ buffer_id=ofp.OFP_NO_BUFFER,
+ priority=1000)
+ logging.info("%s flow sending matching packets to controller" % "Install" if install else "Remove")
+ test.controller.message_send(request)
+ do_barrier(test.controller)
+
def testPacketFlow(test, c_vlan_id, s_vlan_id):
incorrectTagPkt = simple_udp_packet(pktlen=100, dl_vlan_enable=True, vlan_vid=100, vlan_pcp=0)