Next iteration of fix for jira:155..Single update queue
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
index f4e4223..7736723 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
@@ -18,6 +18,7 @@
 package net.onrc.onos.ofcontroller.linkdiscovery.internal;
 
 import java.io.IOException;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
@@ -126,6 +127,7 @@
 implements IOFMessageListener, IOFSwitchListener, 
 IStorageSourceListener, ILinkDiscoveryService,
 IFloodlightModule, IInfoProvider, IHAListener {
+	protected IFloodlightProviderService controller;
     protected static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
 
     // Names of table/fields for links in the storage API
@@ -232,8 +234,40 @@
     /* topology aware components are called in the order they were added to the
      * the array */
     protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
-    protected BlockingQueue<LDUpdate> updates;
-    protected Thread updatesThread;
+    
+    protected class LinkUpdate extends LDUpdate {
+
+		public LinkUpdate(LDUpdate old) {
+			super(old);
+		}
+		@LogMessageDoc(level="ERROR",
+	            message="Error in link discovery updates loop",
+	            explanation="An unknown error occured while dispatching " +
+	            		"link update notifications",
+	            recommendation=LogMessageDoc.GENERIC_ACTION)
+		@Override
+		public void dispatch() {
+			 if (linkDiscoveryAware != null) {
+	                if (log.isTraceEnabled()) {
+	                    log.trace("Dispatching link discovery update {} {} {} {} {} for {}",
+	                              new Object[]{this.getOperation(),
+	                                           HexString.toHexString(this.getSrc()), this.getSrcPort(),
+	                                           HexString.toHexString(this.getDst()), this.getDstPort(),
+	                                           linkDiscoveryAware});
+	                }
+	                try {
+	                    for (ILinkDiscoveryListener lda : linkDiscoveryAware) { // order maintained
+	                        lda.linkDiscoveryUpdate(this);
+	                    }
+	                }
+	                catch (Exception e) {
+	                    log.error("Error in link discovery updates loop", e);
+	                }
+	            }
+			
+		}
+    	
+    }
 
     /**
      * List of ports through which LLDP/BDDPs are not sent.
@@ -322,34 +356,7 @@
         return ILinkDiscovery.LinkType.INVALID_LINK;
     }
 
-    @LogMessageDoc(level="ERROR",
-            message="Error in link discovery updates loop",
-            explanation="An unknown error occured while dispatching " +
-            		"link update notifications",
-            recommendation=LogMessageDoc.GENERIC_ACTION)
-    private void doUpdatesThread() throws InterruptedException {
-        do {
-            LDUpdate update = updates.take();
-
-            if (linkDiscoveryAware != null) {
-                if (log.isTraceEnabled()) {
-                    log.trace("Dispatching link discovery update {} {} {} {} {} for {}",
-                              new Object[]{update.getOperation(),
-                                           HexString.toHexString(update.getSrc()), update.getSrcPort(),
-                                           HexString.toHexString(update.getDst()), update.getDstPort(),
-                                           linkDiscoveryAware});
-                }
-                try {
-                    for (ILinkDiscoveryListener lda : linkDiscoveryAware) { // order maintained
-                        lda.linkDiscoveryUpdate(update);
-                    }
-                }
-                catch (Exception e) {
-                    log.error("Error in link discovery updates loop", e);
-                }
-            }
-        } while (updates.peek() != null);
-    }
+    
     private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
         return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
     }
@@ -490,7 +497,10 @@
         if (portUp) operation = UpdateOperation.PORT_UP;
         else operation = UpdateOperation.PORT_DOWN;
 
-        updates.add(new LDUpdate(sw, port, operation));
+        LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
+        
+        
+        controller.publishUpdate(update);
     }
 
     /** 
@@ -1134,10 +1144,11 @@
 
             if (linkChanged) {
                 // find out if the link was added or removed here.
-                updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                          lt.getDst(), lt.getDstPort(),
                                          getLinkType(lt, newInfo),
                                          updateOperation));
+                controller.publishUpdate(update);
             }
         } finally {
             lock.writeLock().unlock();
@@ -1184,10 +1195,11 @@
                 }
 
                 LinkInfo info = this.links.remove(lt);
-                updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                          lt.getDst(), lt.getDstPort(),
                                          getLinkType(lt, info),
                                          UpdateOperation.LINK_REMOVED));
+                controller.publishUpdate(update);
 
                 // Update Event History
                 evHistTopoLink(lt.getSrc(),
@@ -1251,8 +1263,8 @@
                     ((byte)OFPortReason.OFPPR_MODIFY.ordinal() ==
                     ps.getReason() && !portEnabled(ps.getDesc()))) {
                 deleteLinksOnPort(npt, "Port Status Changed");
-                LDUpdate update = new LDUpdate(sw, port, UpdateOperation.PORT_DOWN);
-                updates.add(update);
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, UpdateOperation.PORT_DOWN));
+                controller.publishUpdate(update);
                 linkDeleted = true;
                 } 
             else if (ps.getReason() ==
@@ -1284,14 +1296,15 @@
                                 (updatedDstPortState != null)) {
                             // The link is already known to link discovery
                             // manager and the status has changed, therefore
-                            // send an LDUpdate.
+                            // send an LinkUpdate.
                             UpdateOperation operation =
                                     getUpdateOperation(linkInfo.getSrcPortState(),
                                                        linkInfo.getDstPortState());
-                            updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                            LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                                      lt.getDst(), lt.getDstPort(),
                                                      getLinkType(lt, linkInfo),
                                                      operation));
+                            controller.publishUpdate(update);
                             writeLinkToStorage(lt, linkInfo);
                             linkInfoChanged = true;
                         }
@@ -1300,7 +1313,8 @@
 
                 UpdateOperation operation =
                         getUpdateOperation(ps.getDesc().getState());
-                updates.add(new LDUpdate(sw, port, operation));
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
+                controller.publishUpdate(update);
             }
 
             if (!linkDeleted && !linkInfoChanged){
@@ -1372,9 +1386,9 @@
         }
         // Update event history
         evHistTopoSwitch(sw, EvAction.SWITCH_CONNECTED, "None");
-        LDUpdate update = new LDUpdate(sw.getId(), null,
-                                       UpdateOperation.SWITCH_UPDATED);
-        updates.add(update);
+        LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), null,
+                                       UpdateOperation.SWITCH_UPDATED));
+        controller.publishUpdate(update);
     }
 
     /**
@@ -1399,8 +1413,8 @@
                 deleteLinks(eraseList, "Switch Removed");
 
                 // Send a switch removed update
-                LDUpdate update = new LDUpdate(sw, null, UpdateOperation.SWITCH_REMOVED);
-                updates.add(update);
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw, null, UpdateOperation.SWITCH_REMOVED));
+                controller.publishUpdate(update);
             }
         } finally {
             lock.writeLock().unlock();
@@ -1486,10 +1500,11 @@
                     UpdateOperation operation;
                     operation = getUpdateOperation(info.getSrcPortState(),
                                                    info.getDstPortState());
-                    updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+                    LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
                                              lt.getDst(), lt.getDstPort(),
                                              getLinkType(lt, info),
                                              operation));
+                    controller.publishUpdate(update);
                 }
             }
 
@@ -1776,16 +1791,18 @@
                 if (log.isTraceEnabled()) {
                     log.trace("SWITCH_IS_CORE_SWITCH set to False for {}", sw);
                 }
-                updates.add(new LDUpdate(sw.getId(), SwitchType.BASIC_SWITCH,
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), SwitchType.BASIC_SWITCH,
                                          UpdateOperation.SWITCH_UPDATED));
+                controller.publishUpdate(update);
             }
             else {
                 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH, new Boolean(true));
                 if (log.isTraceEnabled()) {
                     log.trace("SWITCH_IS_CORE_SWITCH set to True for {}", sw);
                 }
-                updates.add(new LDUpdate(sw.getId(), SwitchType.CORE_SWITCH,
+                LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), SwitchType.CORE_SWITCH,
                                          UpdateOperation.SWITCH_UPDATED));
+                controller.publishUpdate(update);
             }
         }
     }
@@ -1847,7 +1864,6 @@
         // We create this here because there is no ordering guarantee
         this.linkDiscoveryAware = new ArrayList<ILinkDiscoveryListener>();
         this.lock = new ReentrantReadWriteLock();
-        this.updates = new LinkedBlockingQueue<LDUpdate>();
         this.links = new HashMap<Link, LinkInfo>();
         this.portLinks = new HashMap<NodePortTuple, Set<Link>>();
         this.suppressLinkDiscovery =
@@ -1910,6 +1926,8 @@
         }
 
         ScheduledExecutorService ses = threadPool.getScheduledExecutor();
+        controller =
+                context.getServiceImpl(IFloodlightProviderService.class);
 
         // To be started by the first switch connection
         discoveryTask = new SingletonTask(ses, new Runnable() {
@@ -1953,20 +1971,6 @@
         bddpTask = new SingletonTask(ses, new QuarantineWorker());
         bddpTask.reschedule(BDDP_TASK_INTERVAL, TimeUnit.MILLISECONDS);
 
-        updatesThread = new Thread(new Runnable () {
-            @Override
-            public void run() {
-                while (true) {
-                    try {
-                        doUpdatesThread();
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                }
-            }}, "Topology Updates");
-        updatesThread.start();
-
-
 
         // Register for the OpenFlow messages we want to receive
         floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);