Merge pull request #421 from pberde/master

Single updates queue changes to solve jira:155
diff --git a/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java b/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java
index 1e3ec6f..22ff029 100644
--- a/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java
+++ b/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java
@@ -206,5 +206,11 @@
     * switch over to ACTIVE role
     */
    public void setAlwaysClearFlowsOnSwAdd(boolean value);
+   
+   /**
+    * Publish updates to Controller updates queue
+    * @param IUpdate
+    */
+   public void publishUpdate(IUpdate update);
 
 }
diff --git a/src/main/java/net/floodlightcontroller/core/IUpdate.java b/src/main/java/net/floodlightcontroller/core/IUpdate.java
new file mode 100644
index 0000000..950bc0f
--- /dev/null
+++ b/src/main/java/net/floodlightcontroller/core/IUpdate.java
@@ -0,0 +1,10 @@
+package net.floodlightcontroller.core;
+
+public interface IUpdate {
+	
+    /** 
+     * Calls the appropriate listeners
+     */
+    public void dispatch();
+
+}
diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
index ac29983..21eceb3 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
@@ -58,6 +58,7 @@
 import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.core.IOFSwitchFilter;
 import net.floodlightcontroller.core.IOFSwitchListener;
+import net.floodlightcontroller.core.IUpdate;
 import net.floodlightcontroller.core.annotations.LogMessageDoc;
 import net.floodlightcontroller.core.annotations.LogMessageDocs;
 import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
@@ -253,17 +254,8 @@
     // Perf. related configuration
     protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
     protected static final int BATCH_MAX_SIZE = 100;
-    protected static final boolean ALWAYS_DECODE_ETH = true;
-
-    /**
-     *  Updates handled by the main loop 
-     */
-    protected interface IUpdate {
-        /** 
-         * Calls the appropriate listeners
-         */
-        public void dispatch();
-    }
+	protected static final boolean ALWAYS_DECODE_ETH = true;
+  
     public enum SwitchUpdateType {
         ADDED,
         REMOVED,
@@ -271,6 +263,7 @@
         PORTADDED,
         PORTREMOVED
     }
+    
     /**
      * Update message indicating a switch was added or removed 
      * ONOS: This message extended to indicate Port add or removed event.
@@ -461,7 +454,13 @@
         }
     }
     
-    
+    public void publishUpdate(IUpdate update) {
+    	try {
+			this.updates.put(update);
+		} catch (InterruptedException e) {
+			log.error("Failure adding update to queue", e);
+		}
+    }
     
     // **********************
     // ChannelUpstreamHandler
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java
index 6113ea8..cdb71be 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java
@@ -1,5 +1,7 @@
 package net.onrc.onos.ofcontroller.linkdiscovery;
 
+import net.floodlightcontroller.core.IUpdate;
+
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 import org.codehaus.jackson.map.ser.std.ToStringSerializer;
 import org.openflow.util.HexString;
@@ -27,7 +29,7 @@
         }
     }
 
-    public class LDUpdate {
+    public class LDUpdate implements IUpdate{
         protected long src;
         protected short srcPort;
         protected long dst;
@@ -129,6 +131,12 @@
                 return "LDUpdate: Unknown update.";
             }
         }
+
+		@Override
+		public void dispatch() {
+			// TODO Auto-generated method stub
+			
+		}
     }
 
     public enum SwitchType {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
index f4e4223..7736723 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
@@ -18,6 +18,7 @@
 package net.onrc.onos.ofcontroller.linkdiscovery.internal;
 
 import java.io.IOException;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
@@ -126,6 +127,7 @@
 implements IOFMessageListener, IOFSwitchListener, 
 IStorageSourceListener, ILinkDiscoveryService,
 IFloodlightModule, IInfoProvider, IHAListener {
+	protected IFloodlightProviderService controller;
     protected static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
 
     // Names of table/fields for links in the storage API
@@ -232,8 +234,40 @@
     /* topology aware components are called in the order they were added to the
      * the array */
     protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
-    protected BlockingQueue<LDUpdate> updates;
-    protected Thread updatesThread;
+    
+    protected class LinkUpdate extends LDUpdate {
+
+		public LinkUpdate(LDUpdate old) {
+			super(old);
+		}
+		@LogMessageDoc(level="ERROR",
+	            message="Error in link discovery updates loop",
+	            explanation="An unknown error occured while dispatching " +
+	            		"link update notifications",
+	            recommendation=LogMessageDoc.GENERIC_ACTION)
+		@Override
+		public void dispatch() {
+			 if (linkDiscoveryAware != null) {
+	                if (log.isTraceEnabled()) {
+	                    log.trace("Dispatching link discovery update {} {} {} {} {} for {}",
+	                              new Object[]{this.getOperation(),
+	                                           HexString.toHexString(this.getSrc()), this.getSrcPort(),
+	                                           HexString.toHexString(this.getDst()), this.getDstPort(),
+	                                           linkDiscoveryAware});
+	                }
+	                try {
+	                    for (ILinkDiscoveryListener lda : linkDiscoveryAware) { // order maintained
+	                        lda.linkDiscoveryUpdate(this);
+	                    }
+	                }
+	                catch (Exception e) {
+	                    log.error("Error in link discovery updates loop", e);
+	                }
+	            }
+			
+		}
+    	
+    }
 
     /**
      * List of ports through which LLDP/BDDPs are not sent.
@@ -322,34 +356,7 @@
         return ILinkDiscovery.LinkType.INVALID_LINK;
     }
 
-    @LogMessageDoc(level="ERROR",
-            message="Error in link discovery updates loop",
-            explanation="An unknown error occured while dispatching " +
-            		"link update notifications",
-            recommendation=LogMessageDoc.GENERIC_ACTION)
-    private void doUpdatesThread() throws InterruptedException {
-        do {
-            LDUpdate update = updates.take();
-
-            if (linkDiscoveryAware != null) {
-                if (log.isTraceEnabled()) {
-                    log.trace("Dispatching link discovery update {} {} {} {} {} for {}",
-                              new Object[]{update.getOperation(),
-                                           HexString.toHexString(update.getSrc()), update.getSrcPort(),
-                                           HexString.toHexString(update.getDst()), update.getDstPort(),
-                                           linkDiscoveryAware});
-                }
-                try {
-                    for (ILinkDiscoveryListener lda : linkDiscoveryAware) { // order maintained
-                        lda.linkDiscoveryUpdate(update);
-                    }
-                }
-                catch (Exception e) {
-                    log.error("Error in link discovery updates loop", e);
-                }
-            }
-        } while (updates.peek() != null);
-    }
+    
     private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
         return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
     }
@@ -490,7 +497,10 @@
         if (portUp) operation = UpdateOperation.PORT_UP;
         else operation = UpdateOperation.PORT_DOWN;
 
-        updates.add(new LDUpdate(sw, port, operation));
+        LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
+        
+        
+        controller.publishUpdate(update);
     }
 
     /** 
@@ -1134,10 +1144,11 @@
 
             if (linkChanged) {
                 // find out if the link was added or removed here.
-                updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                          lt.getDst(), lt.getDstPort(),
                                          getLinkType(lt, newInfo),
                                          updateOperation));
+                controller.publishUpdate(update);
             }
         } finally {
             lock.writeLock().unlock();
@@ -1184,10 +1195,11 @@
                 }
 
                 LinkInfo info = this.links.remove(lt);
-                updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                          lt.getDst(), lt.getDstPort(),
                                          getLinkType(lt, info),
                                          UpdateOperation.LINK_REMOVED));
+                controller.publishUpdate(update);
 
                 // Update Event History
                 evHistTopoLink(lt.getSrc(),
@@ -1251,8 +1263,8 @@
                     ((byte)OFPortReason.OFPPR_MODIFY.ordinal() ==
                     ps.getReason() && !portEnabled(ps.getDesc()))) {
                 deleteLinksOnPort(npt, "Port Status Changed");
-                LDUpdate update = new LDUpdate(sw, port, UpdateOperation.PORT_DOWN);
-                updates.add(update);
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, UpdateOperation.PORT_DOWN));
+                controller.publishUpdate(update);
                 linkDeleted = true;
                 } 
             else if (ps.getReason() ==
@@ -1284,14 +1296,15 @@
                                 (updatedDstPortState != null)) {
                             // The link is already known to link discovery
                             // manager and the status has changed, therefore
-                            // send an LDUpdate.
+                            // send an LinkUpdate.
                             UpdateOperation operation =
                                     getUpdateOperation(linkInfo.getSrcPortState(),
                                                        linkInfo.getDstPortState());
-                            updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                            LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                                      lt.getDst(), lt.getDstPort(),
                                                      getLinkType(lt, linkInfo),
                                                      operation));
+                            controller.publishUpdate(update);
                             writeLinkToStorage(lt, linkInfo);
                             linkInfoChanged = true;
                         }
@@ -1300,7 +1313,8 @@
 
                 UpdateOperation operation =
                         getUpdateOperation(ps.getDesc().getState());
-                updates.add(new LDUpdate(sw, port, operation));
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
+                controller.publishUpdate(update);
             }
 
             if (!linkDeleted && !linkInfoChanged){
@@ -1372,9 +1386,9 @@
         }
         // Update event history
         evHistTopoSwitch(sw, EvAction.SWITCH_CONNECTED, "None");
-        LDUpdate update = new LDUpdate(sw.getId(), null,
-                                       UpdateOperation.SWITCH_UPDATED);
-        updates.add(update);
+        LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), null,
+                                       UpdateOperation.SWITCH_UPDATED));
+        controller.publishUpdate(update);
     }
 
     /**
@@ -1399,8 +1413,8 @@
                 deleteLinks(eraseList, "Switch Removed");
 
                 // Send a switch removed update
-                LDUpdate update = new LDUpdate(sw, null, UpdateOperation.SWITCH_REMOVED);
-                updates.add(update);
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw, null, UpdateOperation.SWITCH_REMOVED));
+                controller.publishUpdate(update);
             }
         } finally {
             lock.writeLock().unlock();
@@ -1486,10 +1500,11 @@
                     UpdateOperation operation;
                     operation = getUpdateOperation(info.getSrcPortState(),
                                                    info.getDstPortState());
-                    updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                    LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                              lt.getDst(), lt.getDstPort(),
                                              getLinkType(lt, info),
                                              operation));
+                    controller.publishUpdate(update);
                 }
             }
 
@@ -1776,16 +1791,18 @@
                 if (log.isTraceEnabled()) {
                     log.trace("SWITCH_IS_CORE_SWITCH set to False for {}", sw);
                 }
-                updates.add(new LDUpdate(sw.getId(), SwitchType.BASIC_SWITCH,
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), SwitchType.BASIC_SWITCH,
                                          UpdateOperation.SWITCH_UPDATED));
+                controller.publishUpdate(update);
             }
             else {
                 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH, new Boolean(true));
                 if (log.isTraceEnabled()) {
                     log.trace("SWITCH_IS_CORE_SWITCH set to True for {}", sw);
                 }
-                updates.add(new LDUpdate(sw.getId(), SwitchType.CORE_SWITCH,
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), SwitchType.CORE_SWITCH,
                                          UpdateOperation.SWITCH_UPDATED));
+                controller.publishUpdate(update);
             }
         }
     }
@@ -1847,7 +1864,6 @@
         // We create this here because there is no ordering guarantee
         this.linkDiscoveryAware = new ArrayList<ILinkDiscoveryListener>();
         this.lock = new ReentrantReadWriteLock();
-        this.updates = new LinkedBlockingQueue<LDUpdate>();
         this.links = new HashMap<Link, LinkInfo>();
         this.portLinks = new HashMap<NodePortTuple, Set<Link>>();
         this.suppressLinkDiscovery =
@@ -1910,6 +1926,8 @@
         }
 
         ScheduledExecutorService ses = threadPool.getScheduledExecutor();
+        controller =
+                context.getServiceImpl(IFloodlightProviderService.class);
 
         // To be started by the first switch connection
         discoveryTask = new SingletonTask(ses, new Runnable() {
@@ -1953,20 +1971,6 @@
         bddpTask = new SingletonTask(ses, new QuarantineWorker());
         bddpTask.reschedule(BDDP_TASK_INTERVAL, TimeUnit.MILLISECONDS);
 
-        updatesThread = new Thread(new Runnable () {
-            @Override
-            public void run() {
-                while (true) {
-                    try {
-                        doUpdatesThread();
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                }
-            }}, "Topology Updates");
-        updatesThread.start();
-
-
 
         // Register for the OpenFlow messages we want to receive
         floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);