Work toward cleaning up the Topology Manager and the Topology Publisher

ONOS-1890

* Removed explicit calls "linkDiscovery.disableDiscoveryOnPort()"
  when adding a new switch. Those calls were added as a work-around
  to deal with publishing the link discovery before publishing
  the switch or port discovery. The current implementation can tolerate
  reordering, hence those calls are not needed.

* Renamed methods TopologyPublisher.putFoo() and removeFoo() to
  publishAddFoo() and publishRemoveFoo()

* Refactored all publishAddFoo() and publishRemoveFoo() methods
  in the TopologyPublisher:
  - Track properly the published state.
  - Update the publish semantics: publishAddFoo() is allowed to proceed
    only if the instance is the current Master for the corresponding DPID.
    On the other hand, publishRemoveFoo() is allowed if we publishAddFoo()
    was previously allowed.
  - Track switch-to-port, port-to-link, and port-to-host dependencies
    and call publishRemoveFoo() to cleanup as appropriate based on that
    dependency.

Change-Id: I0725ee68508e96f462f6a134ca5d0e2c7ce91ba9
diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
index 69f706c..23d61bb 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
@@ -353,14 +353,7 @@
                 return false;
             activeMasterSwitches.put(dpid, sw);
         }
-        // XXX Workaround to prevent race condition where a link is detected
-        // and attempted to be written to the database before the port is in
-        // the database. We now suppress link discovery on ports until we're
-        // sure they're in the database.
-        for (OFPortDesc port : sw.getPorts()) {
-            linkDiscovery.disableDiscoveryOnPort(sw.getId(),
-                    port.getPortNo().getShortPortNumber());
-        }
+
         // update counters and events
         counters.switchActivated.updateCounterWithFlush();
         evSwitch.updateEventWithFlush(new SwitchEvent(dpid, "activeMaster"));
@@ -463,15 +456,6 @@
             return;
         }
 
-        if (changeType == PortChangeType.ADD) {
-            // XXX Workaround to prevent race condition where a link is detected
-            // and attempted to be written to the database before the port is in
-            // the database. We now suppress link discovery on ports until we're
-            // sure they're in the database.
-            linkDiscovery.disableDiscoveryOnPort(dpid, port.getPortNo()
-                    .getShortPortNumber());
-        }
-
         SwitchUpdate update = new SwitchUpdate(dpid, SwitchUpdateType.PORTCHANGED,
                 port, changeType);
         addUpdateToQueue(update);
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
index 56ca8fc..2863523 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
@@ -3,10 +3,11 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -33,18 +34,26 @@
 import net.onrc.onos.core.registry.IControllerRegistryService.ControlChangeCallback;
 import net.onrc.onos.core.registry.RegistryException;
 import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.OnosInstanceId;
 import net.onrc.onos.core.util.PortNumber;
 import net.onrc.onos.core.util.SwitchPort;
 
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 import org.projectfloodlight.openflow.protocol.OFPortDesc;
 import org.projectfloodlight.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The TopologyPublisher subscribes to topology network events from the
- * discovery modules. These events are reformatted and relayed to the in-memory
- * topology instance.
+ * Class for publishing topology-related events.
+ *
+ * The events are received from the discovery modules, reformatted and
+ * published to the other ONOS instances.
+ *
+ * TODO: Add a synchronization mechanism when publishing the events to
+ * preserve the ordering and to avoid mismatch in the local "published" state,
+ * because each of the caller (the discovery modules) might be running
+ * on a different thread.
  */
 public class TopologyPublisher implements IOFSwitchListener,
         ILinkDiscoveryListener,
@@ -71,47 +80,44 @@
     private IEventChannel<byte[], TopologyEvent> eventChannel;
 
     //
-    // Local state for keeping track of locally discovered events so we can
-    // cleanup properly when a Switch or Port is removed.
+    // Local state for keeping track of locally published events so we can
+    // cleanup properly when an entry is removed.
     //
     // We keep all Port, (incoming) Link and Host events per Switch DPID:
     //  - If a switch goes down, we remove all corresponding Port, Link and
     //    Host events.
     //  - If a port on a switch goes down, we remove all corresponding Link
-    //    and Host events discovered by this instance.
+    //    and Host events attached to this port.
     //
-    // How to handle side-effect of remote events.
-    //  - Remote Port Down event -> Link Down
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
-    //  - Remote Host Added -> lose ownership of Host)
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
+    // TODO: What to do if the Mastership changes?
+    //  - Cleanup state from publishedFoo maps, but do not send REMOVE events?
     //
-    // XXX Domain knowledge based invariant maintenance should be moved to
-    //     driver module, since the invariant may be different on optical, etc.
-    //
-    // What happens on leadership change?
-    //  - Probably should: remove from discovered.. Maps, but not send DELETE
-    //    events
-    //    XXX Switch/Port can be rediscovered by new leader, but Link, Host?
-    //  - Current: There is no way to recognize leadership change?
-    //      ZookeeperRegistry.requestControl(long, ControlChangeCallback)
-    //      is the only way to register listener, and it allows only one
-    //      listener, which is already used by Controller class.
-    //
-    // FIXME Replace with concurrent variant.
-    //   #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
-    //
-    private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
-            new HashMap<>();
+    private ConcurrentMap<Dpid, MastershipEvent> publishedMastershipEvents =
+        new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, SwitchEvent> publishedSwitchEvents =
+        new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, PortEvent>>
+        publishedPortEvents = new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, LinkEvent>>
+        publishedLinkEvents = new ConcurrentHashMap<>();
+    private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, HostEvent>>
+        publishedHostEvents = new ConcurrentHashMap<>();
 
 
     /**
-     * Cleanup old switches from the topology. Old switches are those which have
-     * no controller in the registry.
+     * Gets the ONOS Instance ID.
+     *
+     * @return the ONOS Instance ID.
+     */
+    private OnosInstanceId getOnosInstanceId() {
+        return registryService.getOnosInstanceId();
+    }
+
+    /**
+     * Cleanup old switches from the topology. Old switches are those which
+     * have no controller in the registry.
+     *
+     * TODO: The overall switch cleanup mechanism needs refactoring/redesign.
      */
     private class SwitchCleanup implements ControlChangeCallback, Runnable {
         @Override
@@ -156,7 +162,8 @@
                     if (controller == null) {
                         log.debug("Requesting control to set switch {} INACTIVE",
                                 sw.getDpid());
-                        registryService.requestControl(sw.getDpid().value(), this);
+                        registryService.requestControl(sw.getDpid().value(),
+                                                       this);
                     }
                 } catch (RegistryException e) {
                     log.error("Caught RegistryException in cleanup thread", e);
@@ -166,9 +173,9 @@
 
         /**
          * Second half of the switch cleanup operation. If the registry grants
-         * control of a switch, we can be sure no other instance is writing this
-         * switch to the topology, so we can remove it now.
-         * <p>
+         * control of a switch, we can be sure no other instance is writing
+         * this switch to the topology, so we can remove it now.
+         *
          * @param dpid the dpid of the switch we requested control for
          * @param hasControl whether we got control or not
          */
@@ -179,7 +186,7 @@
                         HexString.toHexString(dpid));
 
                 SwitchEvent switchEvent = new SwitchEvent(new Dpid(dpid));
-                removeSwitchDiscoveryEvent(switchEvent);
+                publishRemoveSwitchEvent(switchEvent);
                 registryService.releaseControl(dpid);
             }
         }
@@ -202,14 +209,7 @@
                 AdminStatus.ACTIVE.toString());
         linkEvent.freeze();
 
-        if (!registryService.hasControl(link.getDst())) {
-            // Don't process or send a link event if we're not master for the
-            // destination switch
-            log.debug("Not the master for dst switch {}. Suppressed link add event {}.",
-                    link.getDst(), linkEvent);
-            return;
-        }
-        putLinkDiscoveryEvent(linkEvent);
+        publishAddLinkEvent(linkEvent);
     }
 
     @Override
@@ -225,15 +225,7 @@
                 TopologyElement.TYPE_PACKET_LAYER);
         linkEvent.freeze();
 
-        if (!registryService.hasControl(link.getDst())) {
-            // Don't process or send a link event if we're not master for the
-            // destination switch
-            log.debug(
-                    "Not the master for dst switch {}. Suppressed link remove event {}.",
-                    link.getDst(), linkEvent);
-            return;
-        }
-        removeLinkDiscoveryEvent(linkEvent);
+        publishRemoveLinkEvent(linkEvent);
     }
 
     /* *****************
@@ -264,12 +256,7 @@
         switchEvent.createStringAttribute(TopologyElement.ELEMENT_ADMIN_STATUS,
                 AdminStatus.ACTIVE.toString());
         switchEvent.freeze();
-        // TODO Not very robust
-        if (!registryService.hasControl(swId)) {
-            log.debug("Not the master for switch {}. Suppressed switch add event {}.",
-                    dpid, switchEvent);
-            return;
-        }
+        // The Port events
         List<PortEvent> portEvents = new ArrayList<PortEvent>();
         for (OFPortDesc port : sw.getPorts()) {
             PortEvent portEvent = new PortEvent(dpid,
@@ -288,14 +275,7 @@
             portEvent.freeze();
             portEvents.add(portEvent);
         }
-        putSwitchDiscoveryEvent(switchEvent, portEvents);
-
-        for (OFPortDesc port : sw.getPorts()) {
-            // Allow links to be discovered on this port now that it's
-            // in the database
-            linkDiscovery.enableDiscoveryOnPort(sw.getId(),
-                    port.getPortNo().getShortPortNumber());
-        }
+        publishAddSwitchEvent(switchEvent, portEvents);
     }
 
     @Override
@@ -325,15 +305,14 @@
         Role role = Role.SLAVE; // TODO: Should be Role.UNKNOWN
 
         MastershipEvent mastershipEvent =
-                new MastershipEvent(dpid, registryService.getOnosInstanceId(),
-                        role);
+                new MastershipEvent(dpid, getOnosInstanceId(), role);
         // FIXME should be merging, with existing attrs, etc..
         // TODO define attr name as constant somewhere.
         // TODO populate appropriate attributes.
         mastershipEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_ALL_LAYERS);
         mastershipEvent.freeze();
-        removeSwitchMastershipEvent(mastershipEvent);
+        publishRemoveSwitchMastershipEvent(mastershipEvent);
     }
 
     @Override
@@ -362,6 +341,12 @@
         }
     }
 
+    /**
+     * Prepares an event for adding a port on a switch.
+     *
+     * @param switchId the switch ID (DPID)
+     * @param port the port to add
+     */
     private void switchPortAdded(long switchId, OFPortDesc port) {
         final Dpid dpid = new Dpid(switchId);
         PortEvent portEvent = new PortEvent(dpid,
@@ -372,19 +357,17 @@
         portEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_PACKET_LAYER);
         portEvent.createStringAttribute("name", port.getName());
-
         portEvent.freeze();
 
-        if (registryService.hasControl(switchId)) {
-            putPortDiscoveryEvent(portEvent);
-            linkDiscovery.enableDiscoveryOnPort(switchId,
-                    port.getPortNo().getShortPortNumber());
-        } else {
-            log.debug("Not the master for switch {}. Suppressed port add event {}.",
-                    new Dpid(switchId), portEvent);
-        }
+        publishAddPortEvent(portEvent);
     }
 
+    /**
+     * Prepares an event for removing a port on a switch.
+     *
+     * @param switchId the switch ID (DPID)
+     * @param port the port to remove
+     */
     private void switchPortRemoved(long switchId, OFPortDesc port) {
         final Dpid dpid = new Dpid(switchId);
 
@@ -396,15 +379,9 @@
         portEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_PACKET_LAYER);
         portEvent.createStringAttribute("name", port.getName());
-
         portEvent.freeze();
 
-        if (registryService.hasControl(switchId)) {
-            removePortDiscoveryEvent(portEvent);
-        } else {
-            log.debug("Not the master for switch {}. Suppressed port del event {}.",
-                    dpid, portEvent);
-        }
+        publishRemovePortEvent(portEvent);
     }
 
     @Override
@@ -450,7 +427,6 @@
         registryService = context.getServiceImpl(IControllerRegistryService.class);
         datagridService = context.getServiceImpl(IDatagridService.class);
         hostService = context.getServiceImpl(IHostService.class);
-
         topologyService = context.getServiceImpl(ITopologyService.class);
     }
 
@@ -489,7 +465,7 @@
 
     @Override
     public void hostAdded(Host host) {
-        log.debug("Called onosDeviceAdded mac {}", host.getMacAddress());
+        log.debug("Host added with MAC {}", host.getMacAddress());
 
         SwitchPort sp = new SwitchPort(host.getSwitchDPID(), host.getSwitchPort());
         List<SwitchPort> spLists = new ArrayList<SwitchPort>();
@@ -500,354 +476,358 @@
         // Does not use vlan info now.
         event.freeze();
 
-        putHostDiscoveryEvent(event);
+        publishAddHostEvent(event);
     }
 
     @Override
     public void hostRemoved(Host host) {
-        log.debug("Called onosDeviceRemoved");
-        HostEvent event = new HostEvent(host.getMacAddress());
-        // XXX shouldn't we be setting attachment points?
-        event.freeze();
-        removeHostDiscoveryEvent(event);
+        log.debug("Host removed with MAC {}", host.getMacAddress());
+
+        //
+        // Remove all previously added HostEvent for this MAC address
+        //
+        // TODO: Currently, the caller of hostRemoved() might not include
+        // the correct set of Attachment Points in the HostEvent entry itself.
+        // Also, we might have multiple HostEvent entries for the same
+        // host (MAC address), each containing a single (different) Attachment
+        // Point.
+        // Hence, here we have to cleanup all HostEvent entries for this
+        // particular host, based on its MAC address.
+        //
+        List<HostEvent> removeHostEvents = new LinkedList<>();
+        for (ConcurrentMap<ByteBuffer, HostEvent> cm : publishedHostEvents.values()) {
+            for (HostEvent hostEvent : cm.values()) {
+                if (hostEvent.getMac().equals(host.getMacAddress())) {
+                    removeHostEvents.add(hostEvent);
+                }
+            }
+        }
+        for (HostEvent event : removeHostEvents) {
+            publishRemoveHostEvent(event);
+        }
     }
 
+    /**
+     * Prepares the Controller role changed event for a switch.
+     *
+     * @param dpid the switch DPID
+     * @param role the new role of the controller
+     */
     private void controllerRoleChanged(Dpid dpid, Role role) {
         log.debug("Local switch controller mastership role changed: dpid = {} role = {}",
                 dpid, role);
         MastershipEvent mastershipEvent =
-                new MastershipEvent(dpid, registryService.getOnosInstanceId(),
-                        role);
+                new MastershipEvent(dpid, getOnosInstanceId(), role);
         // FIXME should be merging, with existing attrs, etc..
         // TODO define attr name as constant somewhere.
         // TODO populate appropriate attributes.
         mastershipEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_ALL_LAYERS);
         mastershipEvent.freeze();
-        putSwitchMastershipEvent(mastershipEvent);
+        publishAddSwitchMastershipEvent(mastershipEvent);
     }
 
     /**
-     * Mastership updated event.
+     * Publishes ADD Mastership Event.
      *
-     * @param mastershipEvent the mastership event.
+     * @param mastershipEvent the mastership event to publish
      */
-    private void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
+    private void publishAddSwitchMastershipEvent(
+                        MastershipEvent mastershipEvent) {
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(mastershipEvent, getOnosInstanceId());
+        log.debug("Publishing add mastership: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        publishedMastershipEvents.put(mastershipEvent.getDpid(),
+                                      mastershipEvent);
     }
 
     /**
-     * Mastership removed event.
+     * Publishes REMOVE Mastership Event.
      *
-     * @param mastershipEvent the mastership event.
+     * @param mastershipEvent the mastership event to publish
      */
-    private void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
+    private void publishRemoveSwitchMastershipEvent(
+                        MastershipEvent mastershipEvent) {
+        if (publishedMastershipEvents.get(mastershipEvent.getDpid()) == null) {
+            return;     // Nothing to do
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(mastershipEvent, getOnosInstanceId());
+        log.debug("Publishing remove mastership: {}", topologyEvent);
         eventChannel.removeEntry(topologyEvent.getID());
+        publishedMastershipEvents.remove(mastershipEvent.getDpid());
     }
 
     /**
-     * Switch discovered event.
+     * Publishes ADD Switch Event.
      *
-     * @param switchEvent the switch event.
-     * @param portEvents  the corresponding port events for the switch.
+     * @param switchEvent the switch event to publish
+     * @param portEvents the corresponding port events for the switch to
+     * publish
      */
-    private void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
-                                         Collection<PortEvent> portEvents) {
-        log.debug("Sending add switch: {}", switchEvent);
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(switchEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-        // Send out notification for each port
-        for (PortEvent portEvent : portEvents) {
-            log.debug("Sending add port: {}", portEvent);
-            topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+    private void publishAddSwitchEvent(SwitchEvent switchEvent,
+                                       Collection<PortEvent> portEvents) {
+        if (!registryService.hasControl(switchEvent.getOriginDpid().value())) {
+            log.debug("Not the master for switch {}. Suppressed switch add event {}.",
+                      switchEvent.getOriginDpid(), switchEvent);
+            return;
         }
 
-        //
-        // Keep track of the added ports
-        //
-        // Get the old Port Events
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-            discoveredAddedPortEvents.get(switchEvent.getDpid());
+        // Keep track of the old Port Events that should be removed
+        ConcurrentMap<ByteBuffer, PortEvent> oldPortEvents =
+            publishedPortEvents.get(switchEvent.getDpid());
         if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
+            oldPortEvents = new ConcurrentHashMap<>();
         }
 
-        // Store the new Port Events in the local cache
-        Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
+        // Publish the information for the switch
+        TopologyEvent topologyEvent =
+            new TopologyEvent(switchEvent, getOnosInstanceId());
+        log.debug("Publishing add switch: {}", topologyEvent);
+        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        publishedSwitchEvents.put(switchEvent.getDpid(), switchEvent);
+
+        // Publish the information for each port
+        ConcurrentMap<ByteBuffer, PortEvent> newPortEvents =
+            new ConcurrentHashMap<>();
         for (PortEvent portEvent : portEvents) {
+            topologyEvent =
+                new TopologyEvent(portEvent, getOnosInstanceId());
+            log.debug("Publishing add port: {}", topologyEvent);
+            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
             ByteBuffer id = portEvent.getIDasByteBuffer();
             newPortEvents.put(id, portEvent);
+            oldPortEvents.remove(id);
         }
-        discoveredAddedPortEvents.put(switchEvent.getDpid(), newPortEvents);
+        publishedPortEvents.put(switchEvent.getDpid(), newPortEvents);
 
-        //
-        // Extract the removed ports
-        //
-        List<PortEvent> removedPortEvents = new LinkedList<>();
-        for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
-            ByteBuffer key = entry.getKey();
-            PortEvent portEvent = entry.getValue();
-            if (!newPortEvents.containsKey(key)) {
-                removedPortEvents.add(portEvent);
-            }
-        }
-
-        // Cleanup old removed ports
-        for (PortEvent portEvent : removedPortEvents) {
-            removePortDiscoveryEvent(portEvent);
-        }
-    }
-
-    /**
-     * Switch removed event.
-     *
-     * @param switchEvent the switch event.
-     */
-    private void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
-        TopologyEvent topologyEvent;
-
-        // Get the old Port Events
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-                discoveredAddedPortEvents.get(switchEvent.getDpid());
-        if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
-        }
-
-        log.debug("Sending remove switch: {}", switchEvent);
-        // Send out notification
-        topologyEvent =
-            new TopologyEvent(switchEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-
-        //
-        // Send out notification for each port.
-        //
-        // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
-        // because it will attempt to remove the port from the database,
-        // and the deactiveSwitch() call above already removed all ports.
-        //
+        // Cleanup for each of the old removed port
         for (PortEvent portEvent : oldPortEvents.values()) {
-            log.debug("Sending remove port:", portEvent);
-            topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-        }
-        discoveredAddedPortEvents.remove(switchEvent.getDpid());
-
-        // Cleanup for each link
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(switchEvent.getDpid());
-        if (oldLinkEvents != null) {
-            for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
-                removeLinkDiscoveryEvent(linkEvent);
-            }
-            discoveredAddedLinkEvents.remove(switchEvent.getDpid());
-        }
-
-        // Cleanup for each host
-        Map<ByteBuffer, HostEvent> oldHostEvents =
-            discoveredAddedHostEvents.get(switchEvent.getDpid());
-        if (oldHostEvents != null) {
-            for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
-                removeHostDiscoveryEvent(hostEvent);
-            }
-            discoveredAddedHostEvents.remove(switchEvent.getDpid());
+            publishRemovePortEvent(portEvent);
         }
     }
 
     /**
-     * Port discovered event.
+     * Publishes REMOVE Switch Event.
      *
-     * @param portEvent the port event.
+     * @param switchEvent the switch event to publish
      */
-    private void putPortDiscoveryEvent(PortEvent portEvent) {
-        log.debug("Sending add port: {}", portEvent);
-        // Send out notification
+    private void publishRemoveSwitchEvent(SwitchEvent switchEvent) {
+        //
+        // TODO: Removed the check for now, because currently this method is
+        // also called by the SwitchCleanup thread, and in that case
+        // the Switch Event was published by some other ONOS instance.
+        //
+        /*
+        if (publishedSwitchEvents.get(switchEvent.getDpid()) == null) {
+            return;     // Nothing to do
+        }
+        */
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(portEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(switchEvent, getOnosInstanceId());
+        log.debug("Publishing remove switch: {}", topologyEvent);
+        eventChannel.removeEntry(topologyEvent.getID());
+        publishedSwitchEvents.remove(switchEvent.getDpid());
+
+        // Cleanup for each port
+        ConcurrentMap<ByteBuffer, PortEvent> portEvents =
+            publishedPortEvents.get(switchEvent.getDpid());
+        if (portEvents != null) {
+            for (PortEvent portEvent : portEvents.values()) {
+                publishRemovePortEvent(portEvent);
+            }
+        }
+
+        publishedPortEvents.remove(switchEvent.getDpid());
+        publishedLinkEvents.remove(switchEvent.getDpid());
+        publishedHostEvents.remove(switchEvent.getDpid());
+    }
+
+    /**
+     * Publishes ADD Port Event.
+     *
+     * @param portEvent the port event to publish
+     */
+    private void publishAddPortEvent(PortEvent portEvent) {
+        if (!registryService.hasControl(portEvent.getOriginDpid().value())) {
+            log.debug("Not the master for switch {}. Suppressed port add event {}.",
+                      portEvent.getOriginDpid(), portEvent);
+            return;
+        }
+
+        // Publish the information
+        TopologyEvent topologyEvent =
+            new TopologyEvent(portEvent, getOnosInstanceId());
+        log.debug("Publishing add port: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
 
         // Store the new Port Event in the local cache
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-            discoveredAddedPortEvents.get(portEvent.getDpid());
-        if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
-            discoveredAddedPortEvents.put(portEvent.getDpid(), oldPortEvents);
-        }
-        ByteBuffer id = portEvent.getIDasByteBuffer();
-        oldPortEvents.put(id, portEvent);
+        ConcurrentMap<ByteBuffer, PortEvent> portEvents =
+            ConcurrentUtils.putIfAbsent(publishedPortEvents,
+                        portEvent.getDpid(),
+                        new ConcurrentHashMap<ByteBuffer, PortEvent>());
+        portEvents.put(portEvent.getIDasByteBuffer(), portEvent);
     }
 
     /**
-     * Port removed event.
+     * Publishes REMOVE Port Event.
      *
-     * @param portEvent the port event.
+     * @param portEvent the port event to publish
      */
-    private void removePortDiscoveryEvent(PortEvent portEvent) {
-        log.debug("Sending remove port: {}", portEvent);
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(portEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-
-        // Cleanup the Port Event from the local cache
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-            discoveredAddedPortEvents.get(portEvent.getDpid());
-        if (oldPortEvents != null) {
-            ByteBuffer id = portEvent.getIDasByteBuffer();
-            oldPortEvents.remove(id);
+    private void publishRemovePortEvent(PortEvent portEvent) {
+        ConcurrentMap<ByteBuffer, PortEvent> portEvents =
+            publishedPortEvents.get(portEvent.getDpid());
+        if (portEvents == null) {
+            return;     // Nothing to do
+        }
+        if (portEvents.get(portEvent.getIDasByteBuffer()) == null) {
+            return;     // Nothing to do
         }
 
-        // Cleanup for the incoming link
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(portEvent.getDpid());
-        if (oldLinkEvents != null) {
-            for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+        // Publish the information
+        TopologyEvent topologyEvent =
+            new TopologyEvent(portEvent, getOnosInstanceId());
+        log.debug("Publishing remove port: {}", topologyEvent);
+        eventChannel.removeEntry(topologyEvent.getID());
+
+        // Cleanup for the incoming link(s)
+        ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
+            publishedLinkEvents.get(portEvent.getDpid());
+        if (linkEvents != null) {
+            for (LinkEvent linkEvent : linkEvents.values()) {
                 if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
-                    removeLinkDiscoveryEvent(linkEvent);
-                    // XXX If we change our model to allow multiple Link on
-                    // a Port, this loop must be fixed to allow continuing.
-                    break;
+                    publishRemoveLinkEvent(linkEvent);
                 }
             }
         }
 
         // Cleanup for the connected hosts
-        // TODO: The implementation below is probably wrong
-        List<HostEvent> removedHostEvents = new LinkedList<>();
-        Map<ByteBuffer, HostEvent> oldHostEvents =
-            discoveredAddedHostEvents.get(portEvent.getDpid());
-        if (oldHostEvents != null) {
-            for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
+        ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
+            publishedHostEvents.get(portEvent.getDpid());
+        if (hostEvents != null) {
+            for (HostEvent hostEvent : hostEvents.values()) {
                 for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
                     if (swp.equals(portEvent.getSwitchPort())) {
-                        removedHostEvents.add(hostEvent);
+                        publishRemoveHostEvent(hostEvent);
                     }
                 }
             }
-            for (HostEvent hostEvent : removedHostEvents) {
-                removeHostDiscoveryEvent(hostEvent);
-            }
         }
+
+        portEvents.remove(portEvent.getIDasByteBuffer());
     }
 
     /**
-     * Link discovered event.
+     * Publishes ADD Link Event.
      *
-     * @param linkEvent the link event.
+     * @param linkEvent the link event to publish
      */
-    private void putLinkDiscoveryEvent(LinkEvent linkEvent) {
-        log.debug("Sending add link: {}", linkEvent);
-        // Send out notification
+    private void publishAddLinkEvent(LinkEvent linkEvent) {
+        if (!registryService.hasControl(linkEvent.getOriginDpid().value())) {
+            log.debug("Not the master for dst switch {}. Suppressed link add event {}.",
+                      linkEvent.getOriginDpid(), linkEvent);
+            return;
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(linkEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(linkEvent, getOnosInstanceId());
+        log.debug("Publishing add link: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
 
         // Store the new Link Event in the local cache
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-        if (oldLinkEvents == null) {
-            oldLinkEvents = new HashMap<>();
-            discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
-                                          oldLinkEvents);
-        }
-        ByteBuffer id = linkEvent.getIDasByteBuffer();
-        oldLinkEvents.put(id, linkEvent);
+        ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
+            ConcurrentUtils.putIfAbsent(publishedLinkEvents,
+                        linkEvent.getDst().getDpid(),
+                        new ConcurrentHashMap<ByteBuffer, LinkEvent>());
+        linkEvents.put(linkEvent.getIDasByteBuffer(), linkEvent);
     }
 
     /**
-     * Link removed event.
+     * Publishes REMOVE Link Event.
      *
-     * @param linkEvent the link event.
+     * @param linkEvent the link event to publish
      */
-    private void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
-        log.debug("Sending remove link: {}", linkEvent);
-        // Send out notification
+    private void publishRemoveLinkEvent(LinkEvent linkEvent) {
+        ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
+            publishedLinkEvents.get(linkEvent.getDst().getDpid());
+        if (linkEvents == null) {
+            return;     // Nothing to do
+        }
+        if (linkEvents.get(linkEvent.getIDasByteBuffer()) == null) {
+            return;     // Nothing to do
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(linkEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(linkEvent, getOnosInstanceId());
+        log.debug("Publishing remove link: {}", topologyEvent);
         eventChannel.removeEntry(topologyEvent.getID());
 
-        // Cleanup the Link Event from the local cache
-        Map<ByteBuffer, LinkEvent> oldLinkEvents =
-            discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-        if (oldLinkEvents != null) {
-            ByteBuffer id = linkEvent.getIDasByteBuffer();
-            oldLinkEvents.remove(id);
-        }
+        linkEvents.remove(linkEvent.getIDasByteBuffer());
     }
 
     /**
-     * Host discovered event.
+     * Publishes ADD Host Event.
      *
-     * @param hostEvent the host event.
+     * @param hostEvent the host event to publish
      */
-    private void putHostDiscoveryEvent(HostEvent hostEvent) {
-        // Send out notification
+    private void publishAddHostEvent(HostEvent hostEvent) {
+        //
+        // NOTE: The implementation below assumes that there is just one
+        // attachment point stored in hostEvent. Currently, this assumption
+        // is true based on the existing implementation of the caller
+        // hostAdded().
+        //
+
+        if (!registryService.hasControl(hostEvent.getOriginDpid().value())) {
+            log.debug("Not the master for attachment switch {}. Suppressed host add event {}.",
+                      hostEvent.getOriginDpid(), hostEvent);
+            return;
+        }
+
+        // Publish the information
         TopologyEvent topologyEvent =
-            new TopologyEvent(hostEvent,
-                              registryService.getOnosInstanceId());
+            new TopologyEvent(hostEvent, getOnosInstanceId());
+        log.debug("Publishing add host: {}", topologyEvent);
         eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-        log.debug("Put the host info into the cache of the topology. mac {}",
-                  hostEvent.getMac());
 
         // Store the new Host Event in the local cache
-        // TODO: The implementation below is probably wrong
-        for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                discoveredAddedHostEvents.get(swp.getDpid());
-            if (oldHostEvents == null) {
-                oldHostEvents = new HashMap<>();
-                discoveredAddedHostEvents.put(swp.getDpid(), oldHostEvents);
-            }
-            ByteBuffer id = hostEvent.getIDasByteBuffer();
-            oldHostEvents.put(id, hostEvent);
-        }
+        ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
+            ConcurrentUtils.putIfAbsent(publishedHostEvents,
+                hostEvent.getOriginDpid(),
+                new ConcurrentHashMap<ByteBuffer, HostEvent>());
+        hostEvents.put(hostEvent.getIDasByteBuffer(), hostEvent);
     }
 
     /**
-     * Host removed event.
+     * Publishes REMOVE Host Event.
      *
-     * @param hostEvent the host event.
+     * @param hostEvent the host event to publish
      */
-    private void removeHostDiscoveryEvent(HostEvent hostEvent) {
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(hostEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-        log.debug("Remove the host info into the cache of the topology. mac {}",
-                  hostEvent.getMac());
-
-        // Cleanup the Host Event from the local cache
-        // TODO: The implementation below is probably wrong
-        ByteBuffer id = hostEvent.getIDasByteBuffer();
-        for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                discoveredAddedHostEvents.get(swp.getDpid());
-            if (oldHostEvents != null) {
-                oldHostEvents.remove(id);
-            }
+    private void publishRemoveHostEvent(HostEvent hostEvent) {
+        ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
+            publishedHostEvents.get(hostEvent.getOriginDpid());
+        if (hostEvents == null) {
+            return;     // Nothing to do
         }
+        if (hostEvents.get(hostEvent.getIDasByteBuffer()) == null) {
+            return;     // Nothing to do
+        }
+
+        // Publish the information
+        TopologyEvent topologyEvent =
+            new TopologyEvent(hostEvent, getOnosInstanceId());
+        log.debug("Publishing remove host: {}", topologyEvent);
+        eventChannel.removeEntry(topologyEvent.getID());
+
+        hostEvents.remove(hostEvent.getIDasByteBuffer());
     }
 }
diff --git a/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java b/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
index f6e6c09..d05fdda 100644
--- a/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
+++ b/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
@@ -115,6 +115,10 @@
             .andReturn(ONOS_INSTANCE_ID_1.toString()).anyTimes();
         expect(registryService.getControllerForSwitch(DPID_2.value()))
             .andReturn(ONOS_INSTANCE_ID_2.toString()).anyTimes();
+        expect(registryService.hasControl(DPID_1.value()))
+            .andReturn(true).anyTimes();
+        expect(registryService.hasControl(DPID_2.value()))
+            .andReturn(false).anyTimes();
 
         allTopologyEvents = new CopyOnWriteArrayList<>();
         expect(eventChannel.getAllEntries())
@@ -128,7 +132,7 @@
     /**
      * Setup the Topology Publisher.
      */
-    private void setupTopologyPublisher() {
+    private void setupTopologyPublisher() throws RegistryException {
         // Create a TopologyPublisher object for testing
         theTopologyPublisher = new TopologyPublisher();
 
@@ -136,6 +140,22 @@
         TestUtils.setField(theTopologyPublisher, "registryService",
                            registryService);
 
+        //
+        // Update the Registry Service, so the ONOS instance is the
+        // Master for both switches.
+        //
+        reset(registryService);
+        expect(registryService.getOnosInstanceId()).andReturn(ONOS_INSTANCE_ID_1).anyTimes();
+        expect(registryService.getControllerForSwitch(DPID_1.value()))
+            .andReturn(ONOS_INSTANCE_ID_1.toString()).anyTimes();
+        expect(registryService.getControllerForSwitch(DPID_2.value()))
+            .andReturn(ONOS_INSTANCE_ID_2.toString()).anyTimes();
+        expect(registryService.hasControl(DPID_1.value()))
+            .andReturn(true).anyTimes();
+        expect(registryService.hasControl(DPID_2.value()))
+            .andReturn(true).anyTimes();
+        replay(registryService);
+
         // Setup the event channel
         TestUtils.setField(theTopologyPublisher, "eventChannel", eventChannel);
     }
@@ -184,26 +204,26 @@
     }
 
     /**
-     * Test the Switch Mastership Updated Event.
+     * Tests the publishing of Add Switch Mastership Event.
      */
     @Test
-    public void testPutSwitchMastershipEvent() {
-        // Mock the eventChannel functions first
+    public void testPublishAddSwitchMastershipEvent() throws RegistryException {
+        setupTopologyPublisher();
+
+        // Mock the eventChannel functions
         eventChannel.addEntry(anyObject(byte[].class),
                               anyObject(TopologyEvent.class));
         expectLastCall().times(1, 1);          // 1 event
         replay(eventChannel);
 
-        setupTopologyPublisher();
-
-        // Generate a new Switch Mastership event
+        // Generate the Switch Mastership event
         Role role = Role.MASTER;
         MastershipEvent mastershipEvent =
             new MastershipEvent(DPID_1, ONOS_INSTANCE_ID_1, role);
 
         // Call the TopologyPublisher function for adding the event
         TestUtils.callMethod(theTopologyPublisher,
-                             "putSwitchMastershipEvent",
+                             "publishAddSwitchMastershipEvent",
                              MastershipEvent.class, mastershipEvent);
 
         // Verify the function calls
@@ -211,25 +231,30 @@
     }
 
     /**
-     * Test the Switch Mastership Removed Event.
+     * Tests the publishing of Remove Switch Mastership Event.
      */
     @Test
-    public void testRemoveSwitchMastershipEvent() {
-        // Mock the eventChannel functions first
+    public void testPublishRemoveSwitchMastershipEvent() throws RegistryException {
+        setupTopologyPublisher();
+
+        // Mock the eventChannel functions
         eventChannel.removeEntry(anyObject(byte[].class));
         expectLastCall().times(1, 1);          // 1 event
         replay(eventChannel);
 
-        setupTopologyPublisher();
-
-        // Generate a new Switch Mastership Event
+        // Generate the Switch Mastership Event
         Role role = Role.MASTER;
         MastershipEvent mastershipEvent =
             new MastershipEvent(DPID_1, ONOS_INSTANCE_ID_1, role);
 
+        // Call the TopologyPublisher function for adding the event
+        TestUtils.callMethod(theTopologyPublisher,
+                             "publishAddSwitchMastershipEvent",
+                             MastershipEvent.class, mastershipEvent);
+
         // Call the TopologyPublisher function for removing the event
         TestUtils.callMethod(theTopologyPublisher,
-                             "removeSwitchMastershipEvent",
+                             "publishRemoveSwitchMastershipEvent",
                              MastershipEvent.class, mastershipEvent);
 
         // Verify the function calls
@@ -237,38 +262,36 @@
     }
 
     /**
-     * Test the Switch discovered and Port discovered functions.
+     * Tests the publishing of Add Switch and Port Events.
      */
     @Test
-    public void testPutSwitchAndPortDiscoveryEvent() {
-        // Mock the eventChannel functions first
+    public void testPublishAddSwitchAndPortEvent() throws RegistryException {
+        setupTopologyPublisher();
+
+        // Mock the eventChannel functions
         eventChannel.addEntry(anyObject(byte[].class),
                               anyObject(TopologyEvent.class));
-        expectLastCall().times(3, 3);  // (1 Switch + 1 Port), 1 Port
+        expectLastCall().times(3, 3);           // (1 Switch + 1 Port), 1 Port
         replay(eventChannel);
 
-        setupTopologyPublisher();
-
         // Mock Switch has one Port
         PortNumber portNumber = PortNumber.uint32(1);
 
-        // Generate a new Switch Event along with a Port Event
+        // Generate the Switch Event along with a Port Event
         SwitchEvent switchEvent = new SwitchEvent(DPID_1);
-
         Collection<PortEvent> portEvents = new ArrayList<PortEvent>();
         portEvents.add(new PortEvent(DPID_1, portNumber));
 
         // Call the TopologyPublisher function for adding a Switch
         TestUtils.callMethod(theTopologyPublisher,
-                             "putSwitchDiscoveryEvent",
+                             "publishAddSwitchEvent",
                              new Class<?>[] {SwitchEvent.class,
                                      Collection.class},
                              switchEvent, portEvents);
-
+        // Call the TopologyPublisher function for adding a Port
         for (PortEvent portEvent : portEvents) {
-            // Call the TopologyPublisher function for adding a Port
             TestUtils.callMethod(theTopologyPublisher,
-                                 "putPortDiscoveryEvent",
+                                 "publishAddPortEvent",
                                  PortEvent.class, portEvent);
         }
 
@@ -278,34 +301,40 @@
     }
 
     /**
-     * Test the Switch and Port removed functions.
+     * Tests the publishing of Remove Switch and Port Events.
      */
     @Test
-    public void testRemoveSwitchAndPortDiscoveryEvent() {
-        // Mock the eventChannel functions first
+    public void testPublishRemoveSwitchAndPortEvent() throws RegistryException {
+        setupTopologyPublisher();
+
+        // Mock the eventChannel functions
         eventChannel.removeEntry(anyObject(byte[].class));
         expectLastCall().times(2, 2);          // 1 Switch, 1 Port
         replay(eventChannel);
 
-        setupTopologyPublisher();
-
         PortNumber portNumber = PortNumber.uint32(1);
 
-        // Generate a Port Event
+        // Generate the Switch Event along with a Port Event
+        SwitchEvent switchEvent = new SwitchEvent(DPID_1);
         Collection<PortEvent> portEvents = new ArrayList<PortEvent>();
         portEvents.add(new PortEvent(DPID_1, portNumber));
 
+        // Call the TopologyPublisher function for adding a Switch and Ports
+        TestUtils.callMethod(theTopologyPublisher,
+                             "publishAddSwitchEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent, portEvents);
+
         // Call the TopologyPublisher function for removing a Port
         for (PortEvent portEvent : portEvents) {
             TestUtils.callMethod(theTopologyPublisher,
-                                 "removePortDiscoveryEvent",
+                                 "publishRemovePortEvent",
                                  PortEvent.class, portEvent);
         }
-
         // Call the TopologyPublisher function for removing a Switch
-        SwitchEvent switchEvent = new SwitchEvent(DPID_1);
         TestUtils.callMethod(theTopologyPublisher,
-                             "removeSwitchDiscoveryEvent",
+                             "publishRemoveSwitchEvent",
                              SwitchEvent.class, switchEvent);
 
         // Verify the function calls
@@ -314,18 +343,18 @@
     }
 
     /**
-     * Test the Link discovered function.
+     * Tests the publishing of Add Link Event.
      */
     @Test
-    public void testPutLinkDiscoveryEvent() {
-        // Mock the eventChannel functions first
+    public void testPublishAddLinkEvent() throws RegistryException {
+        setupTopologyPublisher();
+
+        // Mock the eventChannel functions
         eventChannel.addEntry(anyObject(byte[].class),
                               anyObject(TopologyEvent.class));
         expectLastCall().times(5, 5);  // (2 Switch + 2 Port + 1 Link)
         replay(eventChannel);
 
-        setupTopologyPublisher();
-
         // Generate the Switch and Port Events
         PortNumber portNumber1 = PortNumber.uint32(1);
         SwitchEvent switchEvent1 = new SwitchEvent(DPID_1);
@@ -334,7 +363,7 @@
 
         // Call the TopologyPublisher function for adding a Switch
         TestUtils.callMethod(theTopologyPublisher,
-                             "putSwitchDiscoveryEvent",
+                             "publishAddSwitchEvent",
                              new Class<?>[] {SwitchEvent.class,
                                      Collection.class},
                              switchEvent1, portEvents1);
@@ -347,17 +376,19 @@
 
         // Call the TopologyPublisher function for adding a Switch
         TestUtils.callMethod(theTopologyPublisher,
-                             "putSwitchDiscoveryEvent",
+                             "publishAddSwitchEvent",
                              new Class<?>[] {SwitchEvent.class,
                                      Collection.class},
                              switchEvent2, portEvents2);
 
-        // Create the Link Event
+        // Generate the Link Event
         LinkEvent linkEvent =
             new LinkEvent(new SwitchPort(DPID_1, portNumber1),
                           new SwitchPort(DPID_2, portNumber2));
+
+        // Call the TopologyPublisher function for adding a Link
         TestUtils.callMethod(theTopologyPublisher,
-                             "putLinkDiscoveryEvent",
+                             "publishAddLinkEvent",
                              LinkEvent.class, linkEvent);
 
         // Verify the function calls
@@ -365,17 +396,17 @@
     }
 
     /**
-     * Test the Link removed function.
+     * Tests the publishing of Remove Link Event.
      */
     @Test
-    public void testRemoveLinkDiscoveryEvent() {
-        // Mock the eventChannel functions first
+    public void testPublishRemoveLinkEvent() throws RegistryException {
+        setupTopologyPublisher();
+
+        // Mock the eventChannel functions
         eventChannel.removeEntry(anyObject(byte[].class));
         expectLastCall().times(1, 1);          // (1 Link)
         replay(eventChannel);
 
-        setupTopologyPublisher();
-
         // Generate the Switch and Port Events
         PortNumber portNumber1 = PortNumber.uint32(1);
         SwitchEvent switchEvent1 = new SwitchEvent(DPID_1);
@@ -384,7 +415,7 @@
 
         // Call the TopologyPublisher function for adding a Switch
         TestUtils.callMethod(theTopologyPublisher,
-                             "putSwitchDiscoveryEvent",
+                             "publishAddSwitchEvent",
                              new Class<?>[] {SwitchEvent.class,
                                      Collection.class},
                              switchEvent1, portEvents1);
@@ -397,37 +428,57 @@
 
         // Call the TopologyPublisher function for adding a Switch
         TestUtils.callMethod(theTopologyPublisher,
-                             "putSwitchDiscoveryEvent",
+                             "publishAddSwitchEvent",
                              new Class<?>[] {SwitchEvent.class,
                                      Collection.class},
                              switchEvent2, portEvents2);
 
-        // Remove the Link
-        LinkEvent linkEventRemove =
+        // Generate the Link Event
+        LinkEvent linkEvent =
             new LinkEvent(new SwitchPort(DPID_1, portNumber1),
                           new SwitchPort(DPID_2, portNumber2));
+
+        // Call the TopologyPublisher function for adding a Link
         TestUtils.callMethod(theTopologyPublisher,
-                             "removeLinkDiscoveryEvent",
-                             LinkEvent.class, linkEventRemove);
+                             "publishAddLinkEvent",
+                             LinkEvent.class, linkEvent);
+
+        // Call the TopologyPublisher function for removing a Link
+        TestUtils.callMethod(theTopologyPublisher,
+                             "publishRemoveLinkEvent",
+                             LinkEvent.class, linkEvent);
 
         // Verify the function calls
         verify(eventChannel);
     }
 
     /**
-     * Test the Host discovered function.
+     * Tests the publishing of Add Host Event.
      */
     @Test
-    public void testPutHostDiscoveryEvent() {
-        // Mock the eventChannel functions first
-        eventChannel.addEntry(anyObject(byte[].class),
-                              anyObject(TopologyEvent.class));
-        expectLastCall().times(1, 1);          // 1 Host
-        replay(eventChannel);
-
+    public void testPublishAddHostEvent() throws RegistryException {
         setupTopologyPublisher();
 
-        // Generate a new Host Event
+        // Mock the eventChannel functions
+        eventChannel.addEntry(anyObject(byte[].class),
+                              anyObject(TopologyEvent.class));
+        expectLastCall().times(3, 3);  // (1 Switch + 1 Port + 1 Host)
+        replay(eventChannel);
+
+        // Generate the Switch and Port Events
+        PortNumber portNumber1 = PortNumber.uint32(1);
+        SwitchEvent switchEvent1 = new SwitchEvent(DPID_1);
+        Collection<PortEvent> portEvents1 = new ArrayList<PortEvent>();
+        portEvents1.add(new PortEvent(DPID_1, portNumber1));
+
+        // Call the TopologyPublisher function for adding a Switch
+        TestUtils.callMethod(theTopologyPublisher,
+                             "publishAddSwitchEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent1, portEvents1);
+
+        // Generate the Host Event
         PortNumber portNumber = PortNumber.uint32(1);
         MACAddress hostMac = MACAddress.valueOf("00:AA:11:BB:33:CC");
         SwitchPort sp = new SwitchPort(DPID_1, portNumber);
@@ -438,7 +489,7 @@
 
         // Call the TopologyPublisher function for adding a Host
         TestUtils.callMethod(theTopologyPublisher,
-                             "putHostDiscoveryEvent",
+                             "publishAddHostEvent",
                              HostEvent.class, hostEvent);
 
         // Verify the function calls
@@ -446,18 +497,31 @@
     }
 
     /**
-     * Test the Host removed function.
+     * Tests the publising of Remove Host Event.
      */
     @Test
-    public void testRemoveHostDiscoveryEvent() {
-        // Mock the eventChannel functions first
+    public void testPublishRemoveHostEvent() throws RegistryException {
+        setupTopologyPublisher();
+
+        // Mock the eventChannel functions
         eventChannel.removeEntry(anyObject(byte[].class));
         expectLastCall().times(1, 1);          // 1 Host
         replay(eventChannel);
 
-        setupTopologyPublisher();
+        // Generate the Switch and Port Events
+        PortNumber portNumber1 = PortNumber.uint32(1);
+        SwitchEvent switchEvent1 = new SwitchEvent(DPID_1);
+        Collection<PortEvent> portEvents1 = new ArrayList<PortEvent>();
+        portEvents1.add(new PortEvent(DPID_1, portNumber1));
 
-        // Generate a new Host Event
+        // Call the TopologyPublisher function for adding a Switch
+        TestUtils.callMethod(theTopologyPublisher,
+                             "publishAddSwitchEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent1, portEvents1);
+
+        // Generate the Host Event
         PortNumber portNumber = PortNumber.uint32(1);
         MACAddress hostMac = MACAddress.valueOf("00:AA:11:BB:33:CC");
         SwitchPort sp = new SwitchPort(DPID_1, portNumber);
@@ -466,9 +530,14 @@
         HostEvent hostEvent = new HostEvent(hostMac);
         hostEvent.setAttachmentPoints(spLists);
 
+        // Call the TopologyPublisher function for adding a Host
+        TestUtils.callMethod(theTopologyPublisher,
+                             "publishAddHostEvent",
+                             HostEvent.class, hostEvent);
+
         // Call the TopologyPublisher function for removing a Host
         TestUtils.callMethod(theTopologyPublisher,
-                             "removeHostDiscoveryEvent",
+                             "publishRemoveHostEvent",
                              HostEvent.class, hostEvent);
 
         // Verify the function calls