Work toward cleaning up the Topology Manager and the Topology Publisher

ONOS-1890

* Removed explicit calls "linkDiscovery.disableDiscoveryOnPort()"
  when adding a new switch. Those calls were added as a work-around
  to deal with publishing the link discovery before publishing
  the switch or port discovery. The current implementation can tolerate
  reordering, hence those calls are not needed.

* Renamed methods TopologyPublisher.putFoo() and removeFoo() to
  publishAddFoo() and publishRemoveFoo()

* Refactored all publishAddFoo() and publishRemoveFoo() methods
  in the TopologyPublisher:
  - Track properly the published state.
  - Update the publish semantics: publishAddFoo() is allowed to proceed
    only if the instance is the current Master for the corresponding DPID.
    On the other hand, publishRemoveFoo() is allowed if we publishAddFoo()
    was previously allowed.
  - Track switch-to-port, port-to-link, and port-to-host dependencies
    and call publishRemoveFoo() to cleanup as appropriate based on that
    dependency.

Change-Id: I0725ee68508e96f462f6a134ca5d0e2c7ce91ba9
diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
index 69f706c..23d61bb 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
@@ -353,14 +353,7 @@
                 return false;
             activeMasterSwitches.put(dpid, sw);
         }
-        // XXX Workaround to prevent race condition where a link is detected
-        // and attempted to be written to the database before the port is in
-        // the database. We now suppress link discovery on ports until we're
-        // sure they're in the database.
-        for (OFPortDesc port : sw.getPorts()) {
-            linkDiscovery.disableDiscoveryOnPort(sw.getId(),
-                    port.getPortNo().getShortPortNumber());
-        }
+
         // update counters and events
         counters.switchActivated.updateCounterWithFlush();
         evSwitch.updateEventWithFlush(new SwitchEvent(dpid, "activeMaster"));
@@ -463,15 +456,6 @@
             return;
         }
 
-        if (changeType == PortChangeType.ADD) {
-            // XXX Workaround to prevent race condition where a link is detected
-            // and attempted to be written to the database before the port is in
-            // the database. We now suppress link discovery on ports until we're
-            // sure they're in the database.
-            linkDiscovery.disableDiscoveryOnPort(dpid, port.getPortNo()
-                    .getShortPortNumber());
-        }
-
         SwitchUpdate update = new SwitchUpdate(dpid, SwitchUpdateType.PORTCHANGED,
                 port, changeType);
         addUpdateToQueue(update);
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
index 56ca8fc..2863523 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
@@ -3,10 +3,11 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -33,18 +34,26 @@
 import net.onrc.onos.core.registry.IControllerRegistryService.ControlChangeCallback;
 import net.onrc.onos.core.registry.RegistryException;
 import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.OnosInstanceId;
 import net.onrc.onos.core.util.PortNumber;
 import net.onrc.onos.core.util.SwitchPort;
 
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 import org.projectfloodlight.openflow.protocol.OFPortDesc;
 import org.projectfloodlight.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The TopologyPublisher subscribes to topology network events from the
- * discovery modules. These events are reformatted and relayed to the in-memory
- * topology instance.
+ * Class for publishing topology-related events.
+ *
+ * The events are received from the discovery modules, reformatted and
+ * published to the other ONOS instances.
+ *
+ * TODO: Add a synchronization mechanism when publishing the events to
+ * preserve the ordering and to avoid mismatch in the local "published" state,
+ * because each of the caller (the discovery modules) might be running
+ * on a different thread.
  */
 public class TopologyPublisher implements IOFSwitchListener,
         ILinkDiscoveryListener,
@@ -71,47 +80,44 @@
     private IEventChannel<byte[], TopologyEvent> eventChannel;
 
     //
-    // Local state for keeping track of locally discovered events so we can
-    // cleanup properly when a Switch or Port is removed.
+    // Local state for keeping track of locally published events so we can
+    // cleanup properly when an entry is removed.
     //
     // We keep all Port, (incoming) Link and Host events per Switch DPID:
     //  - If a switch goes down, we remove all corresponding Port, Link and
     //    Host events.
     //  - If a port on a switch goes down, we remove all corresponding Link
-    //    and Host events discovered by this instance.
+    //    and Host events attached to this port.
     //
-    // How to handle side-effect of remote events.
-    //  - Remote Port Down event -> Link Down
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
-    //  - Remote Host Added -> lose ownership of Host)
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
+    // TODO: What to do if the Mastership changes?
+    //  - Cleanup state from publishedFoo maps, but do not send REMOVE events?
     //
-    // XXX Domain knowledge based invariant maintenance should be moved to
-    //     driver module, since the invariant may be different on optical, etc.
-    //
-    // What happens on leadership change?
-    //  - Probably should: remove from discovered.. Maps, but not send DELETE
-    //    events
-    //    XXX Switch/Port can be rediscovered by new leader, but Link, Host?
-    //  - Current: There is no way to recognize leadership change?
-    //      ZookeeperRegistry.requestControl(long, ControlChangeCallback)
-    //      is the only way to register listener, and it allows only one
-    //      listener, which is already used by Controller class.
-    //
-    // FIXME Replace with concurrent variant.
-    //   #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
-    //
-    private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
-            new HashMap<>();
+    private ConcurrentMap<Dpid, MastershipEvent> publishedMastershipEvents =
+        new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, SwitchEvent> publishedSwitchEvents =
+        new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, PortEvent>>
+        publishedPortEvents = new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, LinkEvent>>
+        publishedLinkEvents = new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, HostEvent>>
+        publishedHostEvents = new ConcurrentHashMap<>();
 
 
     /**
-     * Cleanup old switches from the topology. Old switches are those which have
-     * no controller in the registry.
+     * Gets the ONOS Instance ID.
+     *
+     * @return the ONOS Instance ID.
+     */
+    private OnosInstanceId getOnosInstanceId() {
+        return registryService.getOnosInstanceId();
+    }
+
+    /**
+     * Cleanup old switches from the topology. Old switches are those which
+     * have no controller in the registry.
+     *
+     * TODO: The overall switch cleanup mechanism needs refactoring/redesign.
      */
     private class SwitchCleanup implements ControlChangeCallback, Runnable {
         @Override
@@ -156,7 +162,8 @@
                     if (controller == null) {
                         log.debug("Requesting control to set switch {} INACTIVE",
                                 sw.getDpid());
-                        registryService.requestControl(sw.getDpid().value(), this);
+                        registryService.requestControl(sw.getDpid().value(),
+                                                       this);
                     }
                 } catch (RegistryException e) {
                     log.error("Caught RegistryException in cleanup thread", e);
@@ -166,9 +173,9 @@
 
         /**
          * Second half of the switch cleanup operation. If the registry grants
-         * control of a switch, we can be sure no other instance is writing this
-         * switch to the topology, so we can remove it now.
-         * <p>
+         * control of a switch, we can be sure no other instance is writing
+         * this switch to the topology, so we can remove it now.
+         *
          * @param dpid the dpid of the switch we requested control for
          * @param hasControl whether we got control or not
          */
@@ -179,7 +186,7 @@
                         HexString.toHexString(dpid));
 
                 SwitchEvent switchEvent = new SwitchEvent(new Dpid(dpid));
-                removeSwitchDiscoveryEvent(switchEvent);
+                publishRemoveSwitchEvent(switchEvent);
                 registryService.releaseControl(dpid);
             }
         }
@@ -202,14 +209,7 @@
                 AdminStatus.ACTIVE.toString());
         linkEvent.freeze();
 
-        if (!registryService.hasControl(link.getDst())) {
-            // Don't process or send a link event if we're not master for the
-            // destination switch
-            log.debug("Not the master for dst switch {}. Suppressed link add event {}.",
-                    link.getDst(), linkEvent);
-            return;
-        }
-        putLinkDiscoveryEvent(linkEvent);
+        publishAddLinkEvent(linkEvent);
     }
 
     @Override
@@ -225,15 +225,7 @@
                 TopologyElement.TYPE_PACKET_LAYER);
         linkEvent.freeze();
 
-        if (!registryService.hasControl(link.getDst())) {
-            // Don't process or send a link event if we're not master for the
-            // destination switch
-            log.debug(
-                    "Not the master for dst switch {}. Suppressed link remove event {}.",
-                    link.getDst(), linkEvent);
-            return;
-        }
-        removeLinkDiscoveryEvent(linkEvent);
+        publishRemoveLinkEvent(linkEvent);
     }
 
     /* *****************
@@ -264,12 +256,7 @@
         switchEvent.createStringAttribute(TopologyElement.ELEMENT_ADMIN_STATUS,
                 AdminStatus.ACTIVE.toString());
         switchEvent.freeze();
-        // TODO Not very robust
-        if (!registryService.hasControl(swId)) {
-            log.debug("Not the master for switch {}. Suppressed switch add event {}.",
-                    dpid, switchEvent);
-            return;
-        }
+        // The Port events
         List<PortEvent> portEvents = new ArrayList<PortEvent>();
         for (OFPortDesc port : sw.getPorts()) {
             PortEvent portEvent = new PortEvent(dpid,
@@ -288,14 +275,7 @@
             portEvent.freeze();
             portEvents.add(portEvent);
         }
-        putSwitchDiscoveryEvent(switchEvent, portEvents);
-
-        for (OFPortDesc port : sw.getPorts()) {
-            // Allow links to be discovered on this port now that it's
-            // in the database
-            linkDiscovery.enableDiscoveryOnPort(sw.getId(),
-                    port.getPortNo().getShortPortNumber());
-        }
+        publishAddSwitchEvent(switchEvent, portEvents);
     }
 
     @Override
@@ -325,15 +305,14 @@
         Role role = Role.SLAVE; // TODO: Should be Role.UNKNOWN
 
         MastershipEvent mastershipEvent =
-                new MastershipEvent(dpid, registryService.getOnosInstanceId(),
-                        role);
+                new MastershipEvent(dpid, getOnosInstanceId(), role);
         // FIXME should be merging, with existing attrs, etc..
         // TODO define attr name as constant somewhere.
         // TODO populate appropriate attributes.
         mastershipEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_ALL_LAYERS);
         mastershipEvent.freeze();
-        removeSwitchMastershipEvent(mastershipEvent);
+        publishRemoveSwitchMastershipEvent(mastershipEvent);
     }
 
     @Override
@@ -362,6 +341,12 @@
         }
     }
 
+    /**
+     * Prepares an event for adding a port on a switch.
+     *
+     * @param switchId the switch ID (DPID)
+     * @param port the port to add
+     */
     private void switchPortAdded(long switchId, OFPortDesc port) {
         final Dpid dpid = new Dpid(switchId);
         PortEvent portEvent = new PortEvent(dpid,
@@ -372,19 +357,17 @@
         portEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_PACKET_LAYER);
         portEvent.createStringAttribute("name", port.getName());
-
         portEvent.freeze();
 
-        if (registryService.hasControl(switchId)) {
-            putPortDiscoveryEvent(portEvent);
-            linkDiscovery.enableDiscoveryOnPort(switchId,
-                    port.getPortNo().getShortPortNumber());
-        } else {
-            log.debug("Not the master for switch {}. Suppressed port add event {}.",
-                    new Dpid(switchId), portEvent);
-        }
+        publishAddPortEvent(portEvent);
     }
 
+    /**
+     * Prepares an event for removing a port on a switch.
+     *
+     * @param switchId the switch ID (DPID)
+     * @param port the port to remove
+     */
     private void switchPortRemoved(long switchId, OFPortDesc port) {
         final Dpid dpid = new Dpid(switchId);
 
@@ -396,15 +379,9 @@
         portEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_PACKET_LAYER);
         portEvent.createStringAttribute("name", port.getName());
-
         portEvent.freeze();
 
-        if (registryService.hasControl(switchId)) {
-            removePortDiscoveryEvent(portEvent);
-        } else {
-            log.debug("Not the master for switch {}. Suppressed port del event {}.",
-                    dpid, portEvent);
-        }
+        publishRemovePortEvent(portEvent);
     }
 
     @Override
@@ -450,7 +427,6 @@
         registryService = context.getServiceImpl(IControllerRegistryService.class);
         datagridService = context.getServiceImpl(IDatagridService.class);
         hostService = context.getServiceImpl(IHostService.class);
-
         topologyService = context.getServiceImpl(ITopologyService.class);
     }
 
@@ -489,7 +465,7 @@
 
     @Override
     public void hostAdded(Host host) {
-        log.debug("Called onosDeviceAdded mac {}", host.getMacAddress());
+        log.debug("Host added with MAC {}", host.getMacAddress());
 
         SwitchPort sp = new SwitchPort(host.getSwitchDPID(), host.getSwitchPort());
         List<SwitchPort> spLists = new ArrayList<SwitchPort>();
@@ -500,354 +476,358 @@
         // Does not use vlan info now.
         event.freeze();
 
-        putHostDiscoveryEvent(event);
+        publishAddHostEvent(event);
     }
 
     @Override
     public void hostRemoved(Host host) {
-        log.debug("Called onosDeviceRemoved");
-        HostEvent event = new HostEvent(host.getMacAddress());
-        // XXX shouldn't we be setting attachment points?
-        event.freeze();
-        removeHostDiscoveryEvent(event);
+        log.debug("Host removed with MAC {}", host.getMacAddress());
+
+        //
+        // Remove all previously added HostEvent for this MAC address
+        //
+        // TODO: Currently, the caller of hostRemoved() might not include
+        // the correct set of Attachment Points in the HostEvent entry itself.
+        // Also, we might have multiple HostEvent entries for the same
+        // host (MAC address), each containing a single (different) Attachment
+        // Point.
+        // Hence, here we have to cleanup all HostEvent entries for this
+        // particular host, based on its MAC address.
+        //
+        List<HostEvent> removeHostEvents = new LinkedList<>();
+        for (ConcurrentMap<ByteBuffer, HostEvent> cm : publishedHostEvents.values()) {
+            for (HostEvent hostEvent : cm.values()) {
+                if (hostEvent.getMac().equals(host.getMacAddress())) {
+                    removeHostEvents.add(hostEvent);
+                }
+            }
+        }
+        for (HostEvent event : removeHostEvents) {
+            publishRemoveHostEvent(event);
+        }
     }
 
+    /**
+     * Prepares the Controller role changed event for a switch.
+     *
+     * @param dpid the switch DPID
+     * @param role the new role of the controller
+     */
     private void controllerRoleChanged(Dpid dpid, Role role) {
         log.debug("Local switch controller mastership role changed: dpid = {} role = {}",
                 dpid, role);
         MastershipEvent mastershipEvent =
-                new MastershipEvent(dpid, registryService.getOnosInstanceId(),
-                        role);
+                new MastershipEvent(dpid, getOnosInstanceId(), role);
         // FIXME should be merging, with existing attrs, etc..
         // TODO define attr name as constant somewhere.
         // TODO populate appropriate attributes.
         mastershipEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_ALL_LAYERS);
         mastershipEvent.freeze();
-        putSwitchMastershipEvent(mastershipEvent);
+        publishAddSwitchMastershipEvent(mastershipEvent);
     }
 
     /**
-     * Mastership updated event.
+     * Publishes ADD Mastership Event.
      *
-     * @param mastershipEvent the mastership event.
+     * @param mastershipEvent the mastership event to publish
      */
-    private void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
+    private void publishAddSwitchMastershipEvent(
+                        MastershipEvent mastershipEvent) {
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(mastershipEvent, getOnosInstanceId());
+        log.debug("Publishing add mastership: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        publishedMastershipEvents.put(mastershipEvent.getDpid(),
+                                      mastershipEvent);
     }
 
     /**
-     * Mastership removed event.
+     * Publishes REMOVE Mastership Event.
      *
-     * @param mastershipEvent the mastership event.
+     * @param mastershipEvent the mastership event to publish
      */
-    private void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
+    private void publishRemoveSwitchMastershipEvent(
+                        MastershipEvent mastershipEvent) {
+        if (publishedMastershipEvents.get(mastershipEvent.getDpid()) == null) {
+            return;     // Nothing to do
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(mastershipEvent, getOnosInstanceId());
+        log.debug("Publishing remove mastership: {}", topologyEvent);
         eventChannel.removeEntry(topologyEvent.getID());
+        publishedMastershipEvents.remove(mastershipEvent.getDpid());
     }
 
     /**
-     * Switch discovered event.
+     * Publishes ADD Switch Event.
      *
-     * @param switchEvent the switch event.
-     * @param portEvents  the corresponding port events for the switch.
+     * @param switchEvent the switch event to publish
+     * @param portEvents the corresponding port events for the switch to
+     * publish
      */
-    private void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
-                                         Collection<PortEvent> portEvents) {
-        log.debug("Sending add switch: {}", switchEvent);
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(switchEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-        // Send out notification for each port
-        for (PortEvent portEvent : portEvents) {
-            log.debug("Sending add port: {}", portEvent);
-            topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+    private void publishAddSwitchEvent(SwitchEvent switchEvent,
+                                       Collection<PortEvent> portEvents) {
+        if (!registryService.hasControl(switchEvent.getOriginDpid().value())) {
+            log.debug("Not the master for switch {}. Suppressed switch add event {}.",
+                      switchEvent.getOriginDpid(), switchEvent);
+            return;
         }
 
-        //
-        // Keep track of the added ports
-        //
-        // Get the old Port Events
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-            discoveredAddedPortEvents.get(switchEvent.getDpid());
+        // Keep track of the old Port Events that should be removed
+        ConcurrentMap<ByteBuffer, PortEvent> oldPortEvents =
+            publishedPortEvents.get(switchEvent.getDpid());
         if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
+            oldPortEvents = new ConcurrentHashMap<>();
         }
 
-        // Store the new Port Events in the local cache
-        Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
+        // Publish the information for the switch
+        TopologyEvent topologyEvent =
+            new TopologyEvent(switchEvent, getOnosInstanceId());
+        log.debug("Publishing add switch: {}", topologyEvent);
+        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        publishedSwitchEvents.put(switchEvent.getDpid(), switchEvent);
+
+        // Publish the information for each port
+        ConcurrentMap<ByteBuffer, PortEvent> newPortEvents =
+            new ConcurrentHashMap<>();
         for (PortEvent portEvent : portEvents) {
+            topologyEvent =
+                new TopologyEvent(portEvent, getOnosInstanceId());
+            log.debug("Publishing add port: {}", topologyEvent);
+            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
             ByteBuffer id = portEvent.getIDasByteBuffer();
             newPortEvents.put(id, portEvent);
+            oldPortEvents.remove(id);
         }
-        discoveredAddedPortEvents.put(switchEvent.getDpid(), newPortEvents);
+        publishedPortEvents.put(switchEvent.getDpid(), newPortEvents);
 
-        //
-        // Extract the removed ports
-        //
-        List<PortEvent> removedPortEvents = new LinkedList<>();
-        for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
-            ByteBuffer key = entry.getKey();
-            PortEvent portEvent = entry.getValue();
-            if (!newPortEvents.containsKey(key)) {
-                removedPortEvents.add(portEvent);
-            }
-        }
-
-        // Cleanup old removed ports
-        for (PortEvent portEvent : removedPortEvents) {
-            removePortDiscoveryEvent(portEvent);
-        }
-    }
-
-    /**
-     * Switch removed event.
-     *
-     * @param switchEvent the switch event.
-     */
-    private void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
-        TopologyEvent topologyEvent;
-
-        // Get the old Port Events
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-                discoveredAddedPortEvents.get(switchEvent.getDpid());
-        if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
-        }
-
-        log.debug("Sending remove switch: {}", switchEvent);
-        // Send out notification
-        topologyEvent =
-            new TopologyEvent(switchEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-
-        //
-        // Send out notification for each port.
-        //
-        // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
-        // because it will attempt to remove the port from the database,
-        // and the deactiveSwitch() call above already removed all ports.
-        //
+        // Cleanup for each of the old removed port
         for (PortEvent portEvent : oldPortEvents.values()) {
-            log.debug("Sending remove port:", portEvent);
-            topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-        }
-        discoveredAddedPortEvents.remove(switchEvent.getDpid());
-
-        // Cleanup for each link
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(switchEvent.getDpid());
-        if (oldLinkEvents != null) {
-            for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
-                removeLinkDiscoveryEvent(linkEvent);
-            }
-            discoveredAddedLinkEvents.remove(switchEvent.getDpid());
-        }
-
-        // Cleanup for each host
-        Map<ByteBuffer, HostEvent> oldHostEvents =
-            discoveredAddedHostEvents.get(switchEvent.getDpid());
-        if (oldHostEvents != null) {
-            for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
-                removeHostDiscoveryEvent(hostEvent);
-            }
-            discoveredAddedHostEvents.remove(switchEvent.getDpid());
+            publishRemovePortEvent(portEvent);
         }
     }
 
     /**
-     * Port discovered event.
+     * Publishes REMOVE Switch Event.
      *
-     * @param portEvent the port event.
+     * @param switchEvent the switch event to publish
      */
-    private void putPortDiscoveryEvent(PortEvent portEvent) {
-        log.debug("Sending add port: {}", portEvent);
-        // Send out notification
+    private void publishRemoveSwitchEvent(SwitchEvent switchEvent) {
+        //
+        // TODO: Removed the check for now, because currently this method is
+        // also called by the SwitchCleanup thread, and in that case
+        // the Switch Event was published by some other ONOS instance.
+        //
+        /*
+        if (publishedSwitchEvents.get(switchEvent.getDpid()) == null) {
+            return;     // Nothing to do
+        }
+        */
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(portEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(switchEvent, getOnosInstanceId());
+        log.debug("Publishing remove switch: {}", topologyEvent);
+        eventChannel.removeEntry(topologyEvent.getID());
+        publishedSwitchEvents.remove(switchEvent.getDpid());
+
+        // Cleanup for each port
+        ConcurrentMap<ByteBuffer, PortEvent> portEvents =
+            publishedPortEvents.get(switchEvent.getDpid());
+        if (portEvents != null) {
+            for (PortEvent portEvent : portEvents.values()) {
+                publishRemovePortEvent(portEvent);
+            }
+        }
+
+        publishedPortEvents.remove(switchEvent.getDpid());
+        publishedLinkEvents.remove(switchEvent.getDpid());
+        publishedHostEvents.remove(switchEvent.getDpid());
+    }
+
+    /**
+     * Publishes ADD Port Event.
+     *
+     * @param portEvent the port event to publish
+     */
+    private void publishAddPortEvent(PortEvent portEvent) {
+        if (!registryService.hasControl(portEvent.getOriginDpid().value())) {
+            log.debug("Not the master for switch {}. Suppressed port add event {}.",
+                      portEvent.getOriginDpid(), portEvent);
+            return;
+        }
+
+        // Publish the information
+        TopologyEvent topologyEvent =
+            new TopologyEvent(portEvent, getOnosInstanceId());
+        log.debug("Publishing add port: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
 
         // Store the new Port Event in the local cache
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-            discoveredAddedPortEvents.get(portEvent.getDpid());
-        if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
-            discoveredAddedPortEvents.put(portEvent.getDpid(), oldPortEvents);
-        }
-        ByteBuffer id = portEvent.getIDasByteBuffer();
-        oldPortEvents.put(id, portEvent);
+        ConcurrentMap<ByteBuffer, PortEvent> portEvents =
+            ConcurrentUtils.putIfAbsent(publishedPortEvents,
+                        portEvent.getDpid(),
+                        new ConcurrentHashMap<ByteBuffer, PortEvent>());
+        portEvents.put(portEvent.getIDasByteBuffer(), portEvent);
     }
 
     /**
-     * Port removed event.
+     * Publishes REMOVE Port Event.
      *
-     * @param portEvent the port event.
+     * @param portEvent the port event to publish
      */
-    private void removePortDiscoveryEvent(PortEvent portEvent) {
-        log.debug("Sending remove port: {}", portEvent);
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(portEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-
-        // Cleanup the Port Event from the local cache
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-            discoveredAddedPortEvents.get(portEvent.getDpid());
-        if (oldPortEvents != null) {
-            ByteBuffer id = portEvent.getIDasByteBuffer();
-            oldPortEvents.remove(id);
+    private void publishRemovePortEvent(PortEvent portEvent) {
+        ConcurrentMap<ByteBuffer, PortEvent> portEvents =
+            publishedPortEvents.get(portEvent.getDpid());
+        if (portEvents == null) {
+            return;     // Nothing to do
+        }
+        if (portEvents.get(portEvent.getIDasByteBuffer()) == null) {
+            return;     // Nothing to do
         }
 
-        // Cleanup for the incoming link
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(portEvent.getDpid());
-        if (oldLinkEvents != null) {
-            for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+        // Publish the information
+        TopologyEvent topologyEvent =
+            new TopologyEvent(portEvent, getOnosInstanceId());
+        log.debug("Publishing remove port: {}", topologyEvent);
+        eventChannel.removeEntry(topologyEvent.getID());
+
+        // Cleanup for the incoming link(s)
+        ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
+            publishedLinkEvents.get(portEvent.getDpid());
+        if (linkEvents != null) {
+            for (LinkEvent linkEvent : linkEvents.values()) {
                 if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
-                    removeLinkDiscoveryEvent(linkEvent);
-                    // XXX If we change our model to allow multiple Link on
-                    // a Port, this loop must be fixed to allow continuing.
-                    break;
+                    publishRemoveLinkEvent(linkEvent);
                 }
             }
         }
 
         // Cleanup for the connected hosts
-        // TODO: The implementation below is probably wrong
-        List<HostEvent> removedHostEvents = new LinkedList<>();
-        Map<ByteBuffer, HostEvent> oldHostEvents =
-            discoveredAddedHostEvents.get(portEvent.getDpid());
-        if (oldHostEvents != null) {
-            for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
+        ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
+            publishedHostEvents.get(portEvent.getDpid());
+        if (hostEvents != null) {
+            for (HostEvent hostEvent : hostEvents.values()) {
                 for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
                     if (swp.equals(portEvent.getSwitchPort())) {
-                        removedHostEvents.add(hostEvent);
+                        publishRemoveHostEvent(hostEvent);
                     }
                 }
             }
-            for (HostEvent hostEvent : removedHostEvents) {
-                removeHostDiscoveryEvent(hostEvent);
-            }
         }
+
+        portEvents.remove(portEvent.getIDasByteBuffer());
     }
 
     /**
-     * Link discovered event.
+     * Publishes ADD Link Event.
      *
-     * @param linkEvent the link event.
+     * @param linkEvent the link event to publish
      */
-    private void putLinkDiscoveryEvent(LinkEvent linkEvent) {
-        log.debug("Sending add link: {}", linkEvent);
-        // Send out notification
+    private void publishAddLinkEvent(LinkEvent linkEvent) {
+        if (!registryService.hasControl(linkEvent.getOriginDpid().value())) {
+            log.debug("Not the master for dst switch {}. Suppressed link add event {}.",
+                      linkEvent.getOriginDpid(), linkEvent);
+            return;
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(linkEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(linkEvent, getOnosInstanceId());
+        log.debug("Publishing add link: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
 
         // Store the new Link Event in the local cache
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-        if (oldLinkEvents == null) {
-            oldLinkEvents = new HashMap<>();
-            discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
-                                          oldLinkEvents);
-        }
-        ByteBuffer id = linkEvent.getIDasByteBuffer();
-        oldLinkEvents.put(id, linkEvent);
+        ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
+            ConcurrentUtils.putIfAbsent(publishedLinkEvents,
+                        linkEvent.getDst().getDpid(),
+                        new ConcurrentHashMap<ByteBuffer, LinkEvent>());
+        linkEvents.put(linkEvent.getIDasByteBuffer(), linkEvent);
     }
 
     /**
-     * Link removed event.
+     * Publishes REMOVE Link Event.
      *
-     * @param linkEvent the link event.
+     * @param linkEvent the link event to publish
      */
-    private void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
-        log.debug("Sending remove link: {}", linkEvent);
-        // Send out notification
+    private void publishRemoveLinkEvent(LinkEvent linkEvent) {
+        ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
+            publishedLinkEvents.get(linkEvent.getDst().getDpid());
+        if (linkEvents == null) {
+            return;     // Nothing to do
+        }
+        if (linkEvents.get(linkEvent.getIDasByteBuffer()) == null) {
+            return;     // Nothing to do
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(linkEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(linkEvent, getOnosInstanceId());
+        log.debug("Publishing remove link: {}", topologyEvent);
         eventChannel.removeEntry(topologyEvent.getID());
 
-        // Cleanup the Link Event from the local cache
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-        if (oldLinkEvents != null) {
-            ByteBuffer id = linkEvent.getIDasByteBuffer();
-            oldLinkEvents.remove(id);
-        }
+        linkEvents.remove(linkEvent.getIDasByteBuffer());
     }
 
     /**
-     * Host discovered event.
+     * Publishes ADD Host Event.
      *
-     * @param hostEvent the host event.
+     * @param hostEvent the host event to publish
      */
-    private void putHostDiscoveryEvent(HostEvent hostEvent) {
-        // Send out notification
+    private void publishAddHostEvent(HostEvent hostEvent) {
+        //
+        // NOTE: The implementation below assumes that there is just one
+        // attachment point stored in hostEvent. Currently, this assumption
+        // is true based on the existing implementation of the caller
+        // hostAdded().
+        //
+
+        if (!registryService.hasControl(hostEvent.getOriginDpid().value())) {
+            log.debug("Not the master for attachment switch {}. Suppressed host add event {}.",
+                      hostEvent.getOriginDpid(), hostEvent);
+            return;
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(hostEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(hostEvent, getOnosInstanceId());
+        log.debug("Publishing add host: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-        log.debug("Put the host info into the cache of the topology. mac {}",
-                  hostEvent.getMac());
 
         // Store the new Host Event in the local cache
-        // TODO: The implementation below is probably wrong
-        for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                discoveredAddedHostEvents.get(swp.getDpid());
-            if (oldHostEvents == null) {
-                oldHostEvents = new HashMap<>();
-                discoveredAddedHostEvents.put(swp.getDpid(), oldHostEvents);
-            }
-            ByteBuffer id = hostEvent.getIDasByteBuffer();
-            oldHostEvents.put(id, hostEvent);
-        }
+        ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
+            ConcurrentUtils.putIfAbsent(publishedHostEvents,
+                hostEvent.getOriginDpid(),
+                new ConcurrentHashMap<ByteBuffer, HostEvent>());
+        hostEvents.put(hostEvent.getIDasByteBuffer(), hostEvent);
     }
 
     /**
-     * Host removed event.
+     * Publishes REMOVE Host Event.
      *
-     * @param hostEvent the host event.
+     * @param hostEvent the host event to publish
      */
-    private void removeHostDiscoveryEvent(HostEvent hostEvent) {
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(hostEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-        log.debug("Remove the host info into the cache of the topology. mac {}",
-                  hostEvent.getMac());
-
-        // Cleanup the Host Event from the local cache
-        // TODO: The implementation below is probably wrong
-        ByteBuffer id = hostEvent.getIDasByteBuffer();
-        for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                discoveredAddedHostEvents.get(swp.getDpid());
-            if (oldHostEvents != null) {
-                oldHostEvents.remove(id);
-            }
+    private void publishRemoveHostEvent(HostEvent hostEvent) {
+        ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
+            publishedHostEvents.get(hostEvent.getOriginDpid());
+        if (hostEvents == null) {
+            return;     // Nothing to do
         }
+        if (hostEvents.get(hostEvent.getIDasByteBuffer()) == null) {
+            return;     // Nothing to do
+        }
+
+        // Publish the information
+        TopologyEvent topologyEvent =
+            new TopologyEvent(hostEvent, getOnosInstanceId());
+        log.debug("Publishing remove host: {}", topologyEvent);
+        eventChannel.removeEntry(topologyEvent.getID());
+
+        hostEvents.remove(hostEvent.getIDasByteBuffer());
     }
 }