Fixed an issue that caused FlowPusher to go into an infinite loop.

FlowPusher used to use the IOFSwitch object as a key in its data structures.
When the IOFSwitchListener inteface changed to include only the DPID in its
callback methods instead of the IOFSwitch, the FlowPusher is now not able
to get a reference to the switch once it has disconnected. This means
FlowPusher can't clean out its data structures and it if messages are still
in the queue for the switch it will try and send them forever.

The fix is to change FlowPusher so its internal data structures use the DPID
as the key. While doing this I also changed most of its APIs so that they also
reference switches by DPID rather than by IOFSwitch.

Change-Id: I255d64fdac21d8b147f991ff41f6ecace310b116
diff --git a/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java b/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java
index 6ad1e5f..b770705 100644
--- a/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java
+++ b/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java
@@ -31,8 +31,10 @@
 import net.floodlightcontroller.threadpool.IThreadPoolService;
 import net.onrc.onos.core.intent.FlowEntry;
 import net.onrc.onos.core.intent.IntentOperation.Operator;
+import net.onrc.onos.core.util.Dpid;
 import net.onrc.onos.core.util.IntegrationTest;
 
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.projectfloodlight.openflow.protocol.OFBarrierReply;
@@ -68,11 +70,13 @@
     public void testAddMessage() {
         beginInitMock();
 
+        Dpid dpid = new Dpid(1L);
+
         OFMessage msg = createMock(OFMessage.class);
         expect(msg.getXid()).andReturn((long) 1).anyTimes();
         replay(msg);
 
-        IOFSwitch sw = createConnectedSwitchMock(1);
+        IOFSwitch sw = createConnectedSwitchMock(dpid.value());
 
         try {
             sw.write(eq(msg), eq((FloodlightContext) null));
@@ -85,7 +89,7 @@
         endInitMock();
         initPusher(1);
 
-        boolean addResult = pusher.add(sw, msg);
+        boolean addResult = pusher.add(dpid, msg);
         assertTrue(addResult);
 
         try {
@@ -111,8 +115,9 @@
 
         beginInitMock();
 
-        IOFSwitch sw = createConnectedSwitchMock(1);
+        Dpid dpid = new Dpid(1L);
 
+        IOFSwitch sw = createConnectedSwitchMock(dpid.value());
 
         List<OFMessage> messages = new ArrayList<OFMessage>();
 
@@ -134,7 +139,7 @@
         initPusher(1);
 
         for (OFMessage msg : messages) {
-            boolean addResult = pusher.add(sw, msg);
+            boolean addResult = pusher.add(dpid, msg);
             assertTrue(addResult);
         }
 
@@ -164,9 +169,11 @@
 
         beginInitMock();
 
-        Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<IOFSwitch, List<OFMessage>>();
+        Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<>();
         for (int i = 0; i < numSwitch; ++i) {
-            IOFSwitch sw = createConnectedSwitchMock(i);
+            Dpid dpid = new Dpid(i);
+
+            IOFSwitch sw = createConnectedSwitchMock(dpid.value());
 
             List<OFMessage> messages = new ArrayList<OFMessage>();
 
@@ -192,7 +199,7 @@
 
         for (IOFSwitch sw : swMap.keySet()) {
             for (OFMessage msg : swMap.get(sw)) {
-                boolean addResult = pusher.add(sw, msg);
+                boolean addResult = pusher.add(new Dpid(sw.getId()), msg);
                 assertTrue(addResult);
             }
         }
@@ -226,7 +233,7 @@
 
         beginInitMock();
 
-        Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<IOFSwitch, List<OFMessage>>();
+        Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<>();
         for (int i = 0; i < numThreads; ++i) {
             IOFSwitch sw = createConnectedSwitchMock(i);
             //EasyMock.replay(sw);
@@ -255,7 +262,7 @@
         initPusher(numThreads);
         for (IOFSwitch sw : swMap.keySet()) {
             for (OFMessage msg : swMap.get(sw)) {
-                boolean addResult = pusher.add(sw, msg);
+                boolean addResult = pusher.add(new Dpid(sw.getId()), msg);
                 assertTrue(addResult);
             }
         }
@@ -284,6 +291,20 @@
     /**
      * Test rate limitation of messages works correctly.
      */
+    // XXX Disabling this test for now.
+    // This test doesn't seem to be correctly testing the rate limiting feature.
+    // It tries to measure the time taken to push a set of messages, however the
+    // 'end' timestamp is actually taken when the expectation is set up, before
+    // any messages have been pushed. This means generally when measuredRate is
+    // calculated it is a negative value, so is always less than the allowedRate
+    // and the test passes.
+    // However I was seeing some random failures caused by a divide-by-zero
+    // error, which occurs when the 'end' and 'start' timestamps are taken in
+    // the same millisecond.
+    // Furthermore, rate limits are set in bytes/ms and the rate limiter relies
+    // on being able to get the size of the packets, which is not possible
+    // with Loxi messages. Therefore the rate limiter currently does not work.
+    @Ignore
     @Test
     public void testRateLimitedAddMessage() {
         final long limitRate = 100; // [bytes/ms]
@@ -297,7 +318,9 @@
 
         beginInitMock();
 
-        IOFSwitch sw = createConnectedSwitchMock(1);
+        Dpid dpid = new Dpid(1L);
+
+        IOFSwitch sw = createConnectedSwitchMock(dpid.value());
 
         List<OFMessage> messages = new ArrayList<OFMessage>();
 
@@ -328,16 +351,16 @@
         endInitMock();
         initPusher(1);
 
-        pusher.createQueue(sw);
-        pusher.setRate(sw, limitRate);
+        pusher.createQueue(dpid);
+        pusher.setRate(dpid, limitRate);
 
         long beginTime = System.currentTimeMillis();
         for (OFMessage msg : messages) {
-            boolean addResult = pusher.add(sw, msg);
+            boolean addResult = pusher.add(dpid, msg);
             assertTrue(addResult);
         }
 
-        pusher.barrierAsync(sw);
+        pusher.barrierAsync(dpid);
 
         try {
             do {
@@ -366,7 +389,9 @@
     public void testBarrierMessage() {
         beginInitMock();
 
-        IOFSwitch sw = createConnectedSwitchMock(1);
+        Dpid dpid = new Dpid(1L);
+
+        IOFSwitch sw = createConnectedSwitchMock(dpid.value());
         expect(sw.getOFVersion()).andReturn(OFVersion.OF_10).once();
 
         try {
@@ -379,7 +404,7 @@
         endInitMock();
         initPusher(1);
 
-        OFMessageFuture<OFBarrierReply> future = pusher.barrierAsync(sw);
+        OFMessageFuture<OFBarrierReply> future = pusher.barrierAsync(dpid);
 
         assertNotNull(future);
 
@@ -403,19 +428,12 @@
     @SuppressWarnings("unchecked")
     @Test
     public void testAddFlow() {
+        Dpid dpid = new Dpid(DPID_TO_VERIFY);
+
         // instantiate required objects
-        FlowEntry flowEntry1 = new FlowEntry(DPID_TO_VERIFY, 1, 11, null, null, 0, 0, Operator.ADD);
-        /*
-        flowEntry1.setDpid(new Dpid(DPID_TO_VERIFY));
-        flowEntry1.setFlowId(new FlowId(1));
-        flowEntry1.setInPort(PortNumber.uint16((short) 1));
-        flowEntry1.setOutPort(PortNumber.uint16((short) 11));
-        flowEntry1.setFlowEntryId(new FlowEntryId(1));
-        flowEntry1.setFlowEntryMatch(new FlowEntryMatch());
-        flowEntry1.setFlowEntryActions(new FlowEntryActions());
-        flowEntry1.setFlowEntryErrorState(new FlowEntryErrorState());
-        flowEntry1.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
-        */
+        FlowEntry flowEntry1 =
+                new FlowEntry(DPID_TO_VERIFY, 1, 11, null,
+                        null, 0, 0, Operator.ADD);
 
         beginInitMock();
 
@@ -451,7 +469,7 @@
         endInitMock();
         initPusher(1);
 
-        pusher.pushFlowEntry(sw, flowEntry1);
+        pusher.pushFlowEntry(dpid, flowEntry1);
 
         try {
             Thread.sleep(1000);
@@ -523,6 +541,9 @@
         sw.flush();
         expectLastCall().anyTimes();
 
+        expect(flProviderService.getMasterSwitch(dpid)).andReturn(sw)
+        .anyTimes();
+
         return sw;
     }