Fixed an issue that caused FlowPusher to go into an infinite loop.

FlowPusher used to use the IOFSwitch object as a key in its data structures.
When the IOFSwitchListener inteface changed to include only the DPID in its
callback methods instead of the IOFSwitch, the FlowPusher is now not able
to get a reference to the switch once it has disconnected. This means
FlowPusher can't clean out its data structures and it if messages are still
in the queue for the switch it will try and send them forever.

The fix is to change FlowPusher so its internal data structures use the DPID
as the key. While doing this I also changed most of its APIs so that they also
reference switches by DPID rather than by IOFSwitch.

Change-Id: I255d64fdac21d8b147f991ff41f6ecace310b116
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
index c40eac0..aa1c057 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -25,6 +25,7 @@
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
 import net.onrc.onos.core.intent.FlowEntry;
+import net.onrc.onos.core.util.Dpid;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.projectfloodlight.openflow.protocol.OFBarrierReply;
@@ -36,8 +37,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.cache.CacheBuilder;
-
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 /**
@@ -51,11 +50,23 @@
  */
 public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
-    protected static final int DEFAULT_NUMBER_THREAD = 1;
+    private static final int DEFAULT_NUMBER_THREAD = 1;
 
     // Number of messages sent to switch at once
     protected static final int MAX_MESSAGE_SEND = 100;
 
+    private FloodlightModuleContext context = null;
+    private IThreadPoolService threadPool = null;
+    private IFloodlightProviderService floodlightProvider = null;
+
+    // Map of threads versus dpid
+    private Map<Long, FlowPusherThread> threadMap = null;
+    // Map from (DPID and transaction ID) to Future objects.
+    private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
+            new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
+
+    private int numberThread;
+
     private static class SwitchQueueEntry {
         OFMessage msg;
 
@@ -199,12 +210,12 @@
         final long dpid;
         final long xid;
 
-        static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
-            return new BarrierInfo(sw.getId(), req.getXid());
+        static BarrierInfo create(long dpid, OFBarrierRequest req) {
+            return new BarrierInfo(dpid, req.getXid());
         }
 
-        static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
-            return new BarrierInfo(sw.getId(), rpy.getXid());
+        static BarrierInfo create(long dpid, OFBarrierReply rpy) {
+            return new BarrierInfo(dpid, rpy.getXid());
         }
 
         private BarrierInfo(long dpid, long xid) {
@@ -240,26 +251,11 @@
 
     }
 
-    private FloodlightModuleContext context = null;
-    private IThreadPoolService threadPool = null;
-    private IFloodlightProviderService floodlightProvider = null;
-
-    // Map of threads versus dpid
-    private Map<Long, FlowPusherThread> threadMap = null;
-    // Map from (DPID and transaction ID) to Future objects.
-    private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
-            new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
-
-    private int numberThread;
-
     /**
      * Main thread that reads messages from queues and sends them to switches.
      */
-    private static class FlowPusherThread extends Thread {
-        // Weak ConcurrentHashMap
-        private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
-                .weakKeys()
-                .<IOFSwitch, SwitchQueue>build().asMap();
+    private class FlowPusherThread extends Thread {
+        private Map<Dpid, SwitchQueue> assignedQueues = new ConcurrentHashMap<>();
 
         final Lock queuingLock = new ReentrantLock();
         final Condition messagePushed = queuingLock.newCondition();
@@ -283,10 +279,9 @@
                     }
                 }
 
-                for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
+                for (Iterator<Entry<Dpid, SwitchQueue>> it = assignedQueues
                         .entrySet().iterator(); it.hasNext();) {
-                    Entry<IOFSwitch, SwitchQueue> entry = it.next();
-                    IOFSwitch sw = entry.getKey();
+                    Entry<Dpid, SwitchQueue> entry = it.next();
                     SwitchQueue queue = entry.getValue();
 
                     if (queue == null) {
@@ -294,7 +289,7 @@
                     }
 
                     synchronized (queue) {
-                        processQueue(sw, queue, MAX_MESSAGE_SEND);
+                        processQueue(entry.getKey(), queue, MAX_MESSAGE_SEND);
                         if (queue.toBeDeleted && !queue.hasMessageToSend()) {
                             // remove queue if flagged to be.
                             it.remove();
@@ -308,16 +303,24 @@
          * Read messages from queue and send them to the switch. If number of
          * messages excess the limit, stop sending messages.
          * <p>
-         * @param sw Switch to which messages will be sent.
+         * @param dpid DPID of the switch to which messages will be sent.
          * @param queue Queue of messages.
          * @param maxMsg Limitation of number of messages to be sent. If set to
          *        0, all messages in queue will be sent.
          */
-        private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
+        private void processQueue(Dpid dpid, SwitchQueue queue, int maxMsg) {
             // check sending rate and determine it to be sent or not
             long currentTime = System.currentTimeMillis();
             long size = 0;
 
+            IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
+            if (sw == null) {
+                // FlowPusher state for this switch will get cleaned up soon
+                // due to the switchDisconnected event
+                log.debug("Switch {} not found when processing queue", dpid);
+                return;
+            }
+
             if (sw.isConnected() && queue.isSendable(currentTime)) {
                 int i = 0;
                 while (queue.hasMessageToSend()) {
@@ -419,13 +422,13 @@
     }
 
     @Override
-    public boolean suspend(IOFSwitch sw) {
-        SwitchQueue queue = getQueue(sw);
+    public boolean suspend(Dpid dpid) {
+        SwitchQueue queue = getQueue(dpid);
 
         if (queue == null) {
             // create queue in case suspend is called before first message
             // addition
-            queue = createQueueImpl(sw);
+            queue = createQueueImpl(dpid);
         }
 
         synchronized (queue) {
@@ -438,11 +441,11 @@
     }
 
     @Override
-    public boolean resume(IOFSwitch sw) {
-        SwitchQueue queue = getQueue(sw);
+    public boolean resume(Dpid dpid) {
+        SwitchQueue queue = getQueue(dpid);
 
         if (queue == null) {
-            log.error("No queue is attached to DPID: {}", sw.getStringId());
+            log.error("No queue is attached to DPID: {}", dpid);
             return false;
         }
 
@@ -451,7 +454,7 @@
                 queue.state = QueueState.READY;
 
                 // Free the latch if queue has any messages
-                FlowPusherThread thread = getProcessingThread(sw);
+                FlowPusherThread thread = getProcessingThread(dpid);
                 if (queue.hasMessageToSend()) {
                     thread.notifyMessagePushed();
                 }
@@ -462,8 +465,8 @@
     }
 
     @Override
-    public QueueState getState(IOFSwitch sw) {
-        SwitchQueue queue = getQueue(sw);
+    public QueueState getState(Dpid dpid) {
+        SwitchQueue queue = getQueue(dpid);
 
         if (queue == null) {
             return QueueState.UNKNOWN;
@@ -486,14 +489,14 @@
     }
 
     @Override
-    public void setRate(IOFSwitch sw, long rate) {
-        SwitchQueue queue = getQueue(sw);
+    public void setRate(Dpid dpid, long rate) {
+        SwitchQueue queue = getQueue(dpid);
         if (queue == null) {
-            queue = createQueueImpl(sw);
+            queue = createQueueImpl(dpid);
         }
 
         if (rate > 0) {
-            log.debug("rate for {} is set to {}", sw.getStringId(), rate);
+            log.debug("rate for {} is set to {}", dpid, rate);
             synchronized (queue) {
                 queue.maxRate = rate;
             }
@@ -503,43 +506,43 @@
     @Override
     @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
             justification = "Future versions of createQueueImpl() might return null")
-    public boolean createQueue(IOFSwitch sw) {
-        SwitchQueue queue = createQueueImpl(sw);
+    public boolean createQueue(Dpid dpid) {
+        SwitchQueue queue = createQueueImpl(dpid);
 
         return (queue != null);
     }
 
-    protected SwitchQueue createQueueImpl(IOFSwitch sw) {
-        SwitchQueue queue = getQueue(sw);
+    protected SwitchQueue createQueueImpl(Dpid dpid) {
+        SwitchQueue queue = getQueue(dpid);
         if (queue != null) {
             return queue;
         }
 
-        FlowPusherThread proc = getProcessingThread(sw);
+        FlowPusherThread proc = getProcessingThread(dpid);
         queue = new SwitchQueue();
         queue.state = QueueState.READY;
-        proc.assignedQueues.put(sw, queue);
+        proc.assignedQueues.put(dpid, queue);
 
         return queue;
     }
 
     @Override
-    public boolean deleteQueue(IOFSwitch sw) {
-        return deleteQueue(sw, false);
+    public boolean deleteQueue(Dpid dpid) {
+        return deleteQueue(dpid, false);
     }
 
     @Override
-    public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
-        FlowPusherThread proc = getProcessingThread(sw);
+    public boolean deleteQueue(Dpid dpid, boolean forceStop) {
+        FlowPusherThread proc = getProcessingThread(dpid);
 
         if (forceStop) {
-            SwitchQueue queue = proc.assignedQueues.remove(sw);
+            SwitchQueue queue = proc.assignedQueues.remove(dpid);
             if (queue == null) {
                 return false;
             }
             return true;
         } else {
-            SwitchQueue queue = getQueue(sw);
+            SwitchQueue queue = getQueue(dpid);
             if (queue == null) {
                 return false;
             }
@@ -551,75 +554,81 @@
     }
 
     @Override
-    public boolean add(IOFSwitch sw, OFMessage msg) {
-        return add(sw, msg, MsgPriority.NORMAL);
+    public boolean add(Dpid dpid, OFMessage msg) {
+        return add(dpid, msg, MsgPriority.NORMAL);
     }
 
     @Override
-    public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
-        return addMessageImpl(sw, msg, priority);
+    public boolean add(Dpid dpid, OFMessage msg, MsgPriority priority) {
+        return addMessageImpl(dpid, msg, priority);
     }
 
     @Override
     public void pushFlowEntries(
-            Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+            Collection<Pair<Dpid, FlowEntry>> entries) {
         pushFlowEntries(entries, MsgPriority.NORMAL);
     }
 
     @Override
     public void pushFlowEntries(
-            Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+            Collection<Pair<Dpid, FlowEntry>> entries, MsgPriority priority) {
 
-        for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+        for (Pair<Dpid, FlowEntry> entry : entries) {
             add(entry.getLeft(), entry.getRight(), priority);
         }
     }
 
     @Override
-    public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
-        pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+    public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry) {
+        pushFlowEntry(dpid, flowEntry, MsgPriority.NORMAL);
     }
 
     @Override
-    public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
-        Collection<Pair<IOFSwitch, FlowEntry>> entries = new LinkedList<>();
+    public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
+        Collection<Pair<Dpid, FlowEntry>> entries = new LinkedList<>();
 
-        entries.add(Pair.of(sw, flowEntry));
+        entries.add(Pair.of(dpid, flowEntry));
         pushFlowEntries(entries, priority);
     }
 
     /**
      * Create a message from FlowEntry and add it to the queue of the switch.
      * <p>
-     * @param sw Switch to which message is pushed.
+     * @param dpid DPID of the switch to which the message is pushed.
      * @param flowEntry FlowEntry object used for creating message.
      * @return true if message is successfully added to a queue.
      */
-    private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+    private boolean add(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
+        IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
+        if (sw == null) {
+            log.warn("Couldn't find switch {} when pushing message", dpid);
+            return false;
+        }
+
         //
         // Create the OpenFlow Flow Modification Entry to push
         //
         OFFlowMod fm = flowEntry.buildFlowMod(sw.getFactory());
         // log.trace("Pushing flow mod {}", fm);
-        return addMessageImpl(sw, fm, priority);
+        return addMessageImpl(dpid, fm, priority);
     }
 
     /**
      * Add message to queue.
      * <p>
-     * @param sw
-     * @param msg
-     * @param priority
+     * @param dpid DPID of the switch to which the message is sent
+     * @param msg message to send to the switch
+     * @param priority priority of the message
      * @return true if the message was added successfully, otherwise false
      */
-    protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
-        FlowPusherThread thread = getProcessingThread(sw);
+    protected boolean addMessageImpl(Dpid dpid, OFMessage msg, MsgPriority priority) {
+        FlowPusherThread thread = getProcessingThread(dpid);
 
-        SwitchQueue queue = getQueue(sw);
+        SwitchQueue queue = getQueue(dpid);
 
         // create queue at first addition of message
         if (queue == null) {
-            queue = createQueueImpl(sw);
+            queue = createQueueImpl(dpid);
         }
 
         SwitchQueueEntry entry = new SwitchQueueEntry(msg);
@@ -627,7 +636,8 @@
         synchronized (queue) {
             queue.add(entry, priority);
             if (log.isTraceEnabled()) {
-                log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
+                log.trace("Message is pushed to switch {}: {}",
+                        dpid, entry.getOFMessage());
             }
         }
 
@@ -637,8 +647,8 @@
     }
 
     @Override
-    public OFBarrierReply barrier(IOFSwitch sw) {
-        OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+    public OFBarrierReply barrier(Dpid dpid) {
+        OFMessageFuture<OFBarrierReply> future = barrierAsync(dpid);
         if (future == null) {
             return null;
         }
@@ -653,9 +663,9 @@
     }
 
     @Override
-    public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
+    public OFMessageFuture<OFBarrierReply> barrierAsync(Dpid dpid) {
         // TODO creation of message and future should be moved to OFSwitchImpl
-
+        IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
         if (sw == null) {
             return null;
         }
@@ -663,8 +673,8 @@
         OFBarrierRequest msg = createBarrierRequest(sw);
         OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
                 (int) msg.getXid());
-        barrierFutures.put(BarrierInfo.create(sw, msg), future);
-        addMessageImpl(sw, msg, MsgPriority.NORMAL);
+        barrierFutures.put(BarrierInfo.create(dpid.value(), msg), future);
+        addMessageImpl(dpid, msg, MsgPriority.NORMAL);
         return future;
     }
 
@@ -683,42 +693,42 @@
     /**
      * Get a queue attached to a switch.
      * <p>
-     * @param sw Switch object
+     * @param dpid DPID of the switch
      * @return Queue object
      */
-    protected SwitchQueue getQueue(IOFSwitch sw) {
-        if (sw == null) {
+    protected SwitchQueue getQueue(Dpid dpid) {
+        if (dpid == null) {
             return null;
         }
 
-        FlowPusherThread th = getProcessingThread(sw);
+        FlowPusherThread th = getProcessingThread(dpid);
         if (th == null) {
             return null;
         }
 
-        return th.assignedQueues.get(sw);
+        return th.assignedQueues.get(dpid);
     }
 
     /**
      * Get a hash value correspondent to a switch.
      * <p>
-     * @param sw Switch object
+     * @param dpid DPID of the switch
      * @return Hash value
      */
-    protected long getHash(IOFSwitch sw) {
+    protected long getHash(long dpid) {
         // This code assumes DPID is sequentially assigned.
         // TODO consider equalization algorithm
-        return sw.getId() % numberThread;
+        return dpid % numberThread;
     }
 
     /**
      * Get a Thread object which processes the queue attached to a switch.
      * <p>
-     * @param sw Switch object
+     * @param dpid DPID of the switch
      * @return Thread object
      */
-    protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
-        long hash = getHash(sw);
+    protected FlowPusherThread getProcessingThread(Dpid dpid) {
+        long hash = getHash(dpid.value());
 
         return threadMap.get(hash);
     }
@@ -751,7 +761,7 @@
         }
 
         OFBarrierReply reply = (OFBarrierReply) msg;
-        BarrierInfo info = BarrierInfo.create(sw, reply);
+        BarrierInfo info = BarrierInfo.create(sw.getId(), reply);
         // Deliver future if exists
         OFBarrierReplyFuture future = barrierFutures.get(info);
         if (future != null) {