Work toward cleaning up the Topology Manager and the Topology Publisher

ONOS-1890

Moved some of the methods in class TopologyManager to TopologyPublisher,
because naturally they belong to the latter.
This is also needed for the log-based mechanism.
In the process, removed the event writing to the datastore, until
it becomes clear when, where and what to write.

Change-Id: Ic1fe1db533aec66a91bf643b0989119f33c3d37e
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
index 7ab5c47..ecb1212 100644
--- a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
@@ -30,14 +30,4 @@
      * @param listener the listener to remove.
      */
     public void removeListener(ITopologyListener listener);
-
-    /**
-     * Allows a module to get a reference to the southbound interface to
-     * the topology.
-     * TODO Figure out how to hide the southbound interface from
-     * applications/modules that shouldn't touch it
-     *
-     * @return the TopologyDiscoveryInterface object
-     */
-    public TopologyDiscoveryInterface getTopologyDiscoveryInterface();
 }
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyDiscoveryInterface.java b/src/main/java/net/onrc/onos/core/topology/TopologyDiscoveryInterface.java
deleted file mode 100644
index c373bad..0000000
--- a/src/main/java/net/onrc/onos/core/topology/TopologyDiscoveryInterface.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package net.onrc.onos.core.topology;
-
-import java.util.Collection;
-
-/**
- * Interface used by the Topology Discovery module to write topology-related
- * events.
- */
-public interface TopologyDiscoveryInterface {
-    /**
-     * Switch Mastership updated event.
-     *
-     * @param mastershipEvent the mastership event.
-     */
-    public void putSwitchMastershipEvent(MastershipEvent mastershipEvent);
-
-    /**
-     * Switch Mastership removed event.
-     *
-     * @param mastershipEvent the mastership event.
-     */
-    public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent);
-
-    /**
-     * Switch discovered event.
-     *
-     * @param switchEvent the switch event.
-     * @param portEvents  the corresponding port events for the switch.
-     */
-    public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
-                                        Collection<PortEvent> portEvents);
-
-    /**
-     * Switch removed event.
-     *
-     * @param switchEvent the switch event.
-     */
-    public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent);
-
-    /**
-     * Port discovered event.
-     *
-     * @param portEvent the port event.
-     */
-    public void putPortDiscoveryEvent(PortEvent portEvent);
-
-    /**
-     * Port removed event.
-     *
-     * @param portEvent the port event.
-     */
-    public void removePortDiscoveryEvent(PortEvent portEvent);
-
-    /**
-     * Link discovered event.
-     *
-     * @param linkEvent the link event.
-     */
-    public void putLinkDiscoveryEvent(LinkEvent linkEvent);
-
-    /**
-     * Link removed event.
-     *
-     * @param linkEvent the link event.
-     */
-    public void removeLinkDiscoveryEvent(LinkEvent linkEvent);
-
-    /**
-     * Host discovered event.
-     *
-     * @param hostEvent the host event.
-     */
-    public void putHostDiscoveryEvent(HostEvent hostEvent);
-
-    /**
-     * Host removed event.
-     *
-     * @param hostEvent the host event.
-     */
-    public void removeHostDiscoveryEvent(HostEvent hostEvent);
-}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 1280cf4..daa334b 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -22,17 +22,12 @@
 import net.onrc.onos.core.datagrid.IDatagridService;
 import net.onrc.onos.core.datagrid.IEventChannel;
 import net.onrc.onos.core.datagrid.IEventChannelListener;
-import net.onrc.onos.core.datastore.topology.KVDevice;
-import net.onrc.onos.core.datastore.topology.KVLink;
-import net.onrc.onos.core.datastore.topology.KVPort;
-import net.onrc.onos.core.datastore.topology.KVSwitch;
 import net.onrc.onos.core.metrics.OnosMetrics;
 import net.onrc.onos.core.metrics.OnosMetrics.MetricsComponent;
 import net.onrc.onos.core.metrics.OnosMetrics.MetricsFeature;
 import net.onrc.onos.core.registry.IControllerRegistryService;
 import net.onrc.onos.core.util.Dpid;
 import net.onrc.onos.core.util.EventEntry;
-import net.onrc.onos.core.util.PortNumber;
 import net.onrc.onos.core.util.SwitchPort;
 import net.onrc.onos.core.util.serializers.KryoFactory;
 
@@ -58,7 +53,7 @@
  * TODO TBD: This class may delay the requested change to handle event
  * re-ordering. e.g.) Link Add came in, but Switch was not there.
  */
-public class TopologyManager implements TopologyDiscoveryInterface {
+public class TopologyManager {
 
     private static final Logger log = LoggerFactory
             .getLogger(TopologyManager.class);
@@ -67,9 +62,7 @@
     public static final String EVENT_CHANNEL_NAME = "onos.topology";
     private EventHandler eventHandler = new EventHandler();
 
-    private TopologyDatastore datastore;
     private final TopologyImpl topology = new TopologyImpl();
-    private final IControllerRegistryService registryService;
     private Kryo kryo = KryoFactory.newKryoObject();
     private TopologyEventPreprocessor eventPreprocessor;
     private CopyOnWriteArrayList<ITopologyListener> topologyListeners =
@@ -104,44 +97,6 @@
                                 "ListenerEventRate");
 
     //
-    // Local state for keeping track of locally discovered events so we can
-    // cleanup properly when a Switch or Port is removed.
-    //
-    // We keep all Port, (incoming) Link and Host events per Switch DPID:
-    //  - If a switch goes down, we remove all corresponding Port, Link and
-    //    Host events.
-    //  - If a port on a switch goes down, we remove all corresponding Link
-    //    and Host events discovered by this instance.
-    //
-    // How to handle side-effect of remote events.
-    //  - Remote Port Down event -> Link Down
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
-    //  - Remote Host Added -> lose ownership of Host)
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
-    //
-    // XXX Domain knowledge based invariant maintenance should be moved to
-    //     driver module, since the invariant may be different on optical, etc.
-    //
-    // What happens on leadership change?
-    //  - Probably should: remove from discovered.. Maps, but not send DELETE
-    //    events
-    //    XXX Switch/Port can be rediscovered by new leader, but Link, Host?
-    //  - Current: There is no way to recognize leadership change?
-    //      ZookeeperRegistry.requestControl(long, ControlChangeCallback)
-    //      is the only way to register listener, and it allows only one
-    //      listener, which is already used by Controller class.
-    //
-    // FIXME Replace with concurrent variant.
-    //   #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
-    //
-    private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
-            new HashMap<>();
-
-    //
     // Local state for keeping the last ADD Mastership Event entries.
     // TODO: In the future, we might have to keep this state somewhere else.
     //
@@ -173,8 +128,6 @@
      * @param registryService the Registry Service to use.
      */
     public TopologyManager(IControllerRegistryService registryService) {
-        datastore = new TopologyDatastore();
-        this.registryService = registryService;
         this.eventPreprocessor =
             new TopologyEventPreprocessor(registryService);
     }
@@ -201,12 +154,7 @@
          */
         private void startup() {
             //
-            // TODO: Read all state from the database:
-            //
-            // Collection<EventEntry<TopologyEvent>> collection =
-            //    readWholeTopologyFromDB();
-            //
-            // For now, as a shortcut we read it from the datagrid
+            // Read all topology state
             //
             Collection<TopologyEvent> allTopologyEvents =
                     eventChannel.getAllEntries();
@@ -611,359 +559,6 @@
         apiRemovedHostEvents.clear();
     }
 
-    /**
-     * Mastership updated event.
-     *
-     * @param mastershipEvent the mastership event.
-     */
-    @Override
-    public void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-    }
-
-    /**
-     * Mastership removed event.
-     *
-     * @param mastershipEvent the mastership event.
-     */
-    @Override
-    public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-    }
-
-    /**
-     * Switch discovered event.
-     *
-     * @param switchEvent the switch event.
-     * @param portEvents  the corresponding port events for the switch.
-     */
-    @Override
-    public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
-                                        Collection<PortEvent> portEvents) {
-        if (datastore.addSwitch(switchEvent, portEvents)) {
-            log.debug("Sending add switch: {}", switchEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(switchEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-            // Send out notification for each port
-            for (PortEvent portEvent : portEvents) {
-                log.debug("Sending add port: {}", portEvent);
-                topologyEvent =
-                    new TopologyEvent(portEvent,
-                                      registryService.getOnosInstanceId());
-                eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-            }
-
-            //
-            // Keep track of the added ports
-            //
-            // Get the old Port Events
-            Map<ByteBuffer, PortEvent> oldPortEvents =
-                    discoveredAddedPortEvents.get(switchEvent.getDpid());
-            if (oldPortEvents == null) {
-                oldPortEvents = new HashMap<>();
-            }
-
-            // Store the new Port Events in the local cache
-            Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
-            for (PortEvent portEvent : portEvents) {
-                ByteBuffer id = portEvent.getIDasByteBuffer();
-                newPortEvents.put(id, portEvent);
-            }
-            discoveredAddedPortEvents.put(switchEvent.getDpid(),
-                    newPortEvents);
-
-            //
-            // Extract the removed ports
-            //
-            List<PortEvent> removedPortEvents = new LinkedList<>();
-            for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
-                ByteBuffer key = entry.getKey();
-                PortEvent portEvent = entry.getValue();
-                if (!newPortEvents.containsKey(key)) {
-                    removedPortEvents.add(portEvent);
-                }
-            }
-
-            // Cleanup old removed ports
-            for (PortEvent portEvent : removedPortEvents) {
-                removePortDiscoveryEvent(portEvent);
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     * <p/>
-     * Called by {@link TopologyPublisher.SwitchCleanup} thread.
-     */
-    @Override
-    public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
-        TopologyEvent topologyEvent;
-
-        // Get the old Port Events
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-                discoveredAddedPortEvents.get(switchEvent.getDpid());
-        if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
-        }
-
-        if (datastore.deactivateSwitch(switchEvent, oldPortEvents.values())) {
-            log.debug("Sending remove switch: {}", switchEvent);
-            // Send out notification
-            topologyEvent =
-                new TopologyEvent(switchEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-
-            //
-            // Send out notification for each port.
-            //
-            // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
-            // because it will attempt to remove the port from the database,
-            // and the deactiveSwitch() call above already removed all ports.
-            //
-            for (PortEvent portEvent : oldPortEvents.values()) {
-                log.debug("Sending remove port:", portEvent);
-                topologyEvent =
-                    new TopologyEvent(portEvent,
-                                      registryService.getOnosInstanceId());
-                eventChannel.removeEntry(topologyEvent.getID());
-            }
-            discoveredAddedPortEvents.remove(switchEvent.getDpid());
-
-            // Cleanup for each link
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(switchEvent.getDpid());
-            if (oldLinkEvents != null) {
-                for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
-                    removeLinkDiscoveryEvent(linkEvent);
-                }
-                discoveredAddedLinkEvents.remove(switchEvent.getDpid());
-            }
-
-            // Cleanup for each host
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                    discoveredAddedHostEvents.get(switchEvent.getDpid());
-            if (oldHostEvents != null) {
-                for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
-                    removeHostDiscoveryEvent(hostEvent);
-                }
-                discoveredAddedHostEvents.remove(switchEvent.getDpid());
-            }
-        }
-    }
-
-    /**
-     * Port discovered event.
-     *
-     * @param portEvent the port event.
-     */
-    @Override
-    public void putPortDiscoveryEvent(PortEvent portEvent) {
-        if (datastore.addPort(portEvent)) {
-            log.debug("Sending add port: {}", portEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-            // Store the new Port Event in the local cache
-            Map<ByteBuffer, PortEvent> oldPortEvents =
-                    discoveredAddedPortEvents.get(portEvent.getDpid());
-            if (oldPortEvents == null) {
-                oldPortEvents = new HashMap<>();
-                discoveredAddedPortEvents.put(portEvent.getDpid(),
-                        oldPortEvents);
-            }
-            ByteBuffer id = portEvent.getIDasByteBuffer();
-            oldPortEvents.put(id, portEvent);
-        }
-    }
-
-    /**
-     * Port removed event.
-     *
-     * @param portEvent the port event.
-     */
-    @Override
-    public void removePortDiscoveryEvent(PortEvent portEvent) {
-        if (datastore.deactivatePort(portEvent)) {
-            log.debug("Sending remove port: {}", portEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-
-            // Cleanup the Port Event from the local cache
-            Map<ByteBuffer, PortEvent> oldPortEvents =
-                    discoveredAddedPortEvents.get(portEvent.getDpid());
-            if (oldPortEvents != null) {
-                ByteBuffer id = portEvent.getIDasByteBuffer();
-                oldPortEvents.remove(id);
-            }
-
-            // Cleanup for the incoming link
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(portEvent.getDpid());
-            if (oldLinkEvents != null) {
-                for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
-                    if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
-                        removeLinkDiscoveryEvent(linkEvent);
-                        // XXX If we change our model to allow multiple Link on
-                        // a Port, this loop must be fixed to allow continuing.
-                        break;
-                    }
-                }
-            }
-
-            // Cleanup for the connected hosts
-            // TODO: The implementation below is probably wrong
-            List<HostEvent> removedHostEvents = new LinkedList<>();
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                    discoveredAddedHostEvents.get(portEvent.getDpid());
-            if (oldHostEvents != null) {
-                for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
-                    for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-                        if (swp.equals(portEvent.getSwitchPort())) {
-                            removedHostEvents.add(hostEvent);
-                        }
-                    }
-                }
-                for (HostEvent hostEvent : removedHostEvents) {
-                    removeHostDiscoveryEvent(hostEvent);
-                }
-            }
-        }
-    }
-
-    /**
-     * Link discovered event.
-     *
-     * @param linkEvent the link event.
-     */
-    @Override
-    public void putLinkDiscoveryEvent(LinkEvent linkEvent) {
-        if (datastore.addLink(linkEvent)) {
-            log.debug("Sending add link: {}", linkEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(linkEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-            // Store the new Link Event in the local cache
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-            if (oldLinkEvents == null) {
-                oldLinkEvents = new HashMap<>();
-                discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
-                        oldLinkEvents);
-            }
-            ByteBuffer id = linkEvent.getIDasByteBuffer();
-            oldLinkEvents.put(id, linkEvent);
-        }
-    }
-
-    /**
-     * Link removed event.
-     *
-     * @param linkEvent the link event.
-     */
-    @Override
-    public void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
-        if (datastore.removeLink(linkEvent)) {
-            log.debug("Sending remove link: {}", linkEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(linkEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-
-            // Cleanup the Link Event from the local cache
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-            if (oldLinkEvents != null) {
-                ByteBuffer id = linkEvent.getIDasByteBuffer();
-                oldLinkEvents.remove(id);
-            }
-        }
-    }
-
-    /**
-     * Host discovered event.
-     *
-     * @param hostEvent the host event.
-     */
-    @Override
-    public void putHostDiscoveryEvent(HostEvent hostEvent) {
-        if (datastore.addHost(hostEvent)) {
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(hostEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-            log.debug("Put the host info into the cache of the topology. mac {}",
-                      hostEvent.getMac());
-
-            // Store the new Host Event in the local cache
-            // TODO: The implementation below is probably wrong
-            for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-                Map<ByteBuffer, HostEvent> oldHostEvents =
-                        discoveredAddedHostEvents.get(swp.getDpid());
-                if (oldHostEvents == null) {
-                    oldHostEvents = new HashMap<>();
-                    discoveredAddedHostEvents.put(swp.getDpid(),
-                            oldHostEvents);
-                }
-                ByteBuffer id = hostEvent.getIDasByteBuffer();
-                oldHostEvents.put(id, hostEvent);
-            }
-        }
-    }
-
-    /**
-     * Host removed event.
-     *
-     * @param hostEvent the host event.
-     */
-    @Override
-    public void removeHostDiscoveryEvent(HostEvent hostEvent) {
-        if (datastore.removeHost(hostEvent)) {
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(hostEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-            log.debug("Remove the host info into the cache of the topology. mac {}",
-                      hostEvent.getMac());
-
-            // Cleanup the Host Event from the local cache
-            // TODO: The implementation below is probably wrong
-            ByteBuffer id = hostEvent.getIDasByteBuffer();
-            for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-                Map<ByteBuffer, HostEvent> oldHostEvents =
-                        discoveredAddedHostEvents.get(swp.getDpid());
-                if (oldHostEvents != null) {
-                    oldHostEvents.remove(id);
-                }
-            }
-        }
-    }
-
     //
     // Methods to update topology replica
     //
@@ -1390,96 +985,4 @@
         topology.removeHost(mac);
         apiRemovedHostEvents.add(hostInTopo);
     }
-
-    /**
-     * Read the whole topology from the database.
-     *
-     * @return a list of EventEntry-encapsulated Topology Events for
-     * the whole topology.
-     */
-    private List<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
-        List<EventEntry<TopologyEvent>> events =
-                new LinkedList<EventEntry<TopologyEvent>>();
-
-        // XXX May need to clear whole topology first, depending on
-        // how we initially subscribe to replication events
-
-        // Add all active switches
-        for (KVSwitch sw : KVSwitch.getAllSwitches()) {
-            if (sw.getStatus() != KVSwitch.STATUS.ACTIVE) {
-                continue;
-            }
-
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            SwitchEvent switchEvent = new SwitchEvent(new Dpid(sw.getDpid()));
-            TopologyEvent topologyEvent =
-                new TopologyEvent(switchEvent,
-                                  registryService.getOnosInstanceId());
-            EventEntry<TopologyEvent> eventEntry =
-                    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
-                            topologyEvent);
-            events.add(eventEntry);
-        }
-
-        // Add all active ports
-        for (KVPort p : KVPort.getAllPorts()) {
-            if (p.getStatus() != KVPort.STATUS.ACTIVE) {
-                continue;
-            }
-
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            PortEvent portEvent =
-                new PortEvent(new Dpid(p.getDpid()),
-                              new PortNumber(p.getNumber().shortValue()));
-            TopologyEvent topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            EventEntry<TopologyEvent> eventEntry =
-                    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
-                            topologyEvent);
-            events.add(eventEntry);
-        }
-
-        for (KVDevice d : KVDevice.getAllDevices()) {
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            HostEvent devEvent = new HostEvent(MACAddress.valueOf(d.getMac()));
-            for (byte[] portId : d.getAllPortIds()) {
-                devEvent.addAttachmentPoint(
-                        new SwitchPort(KVPort.getDpidFromKey(portId),
-                        KVPort.getNumberFromKey(portId)));
-            }
-        }
-
-        for (KVLink l : KVLink.getAllLinks()) {
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            LinkEvent linkEvent = new LinkEvent(
-                        new SwitchPort(l.getSrc().dpid, l.getSrc().number),
-                        new SwitchPort(l.getDst().dpid, l.getDst().number));
-            TopologyEvent topologyEvent =
-                new TopologyEvent(linkEvent,
-                                  registryService.getOnosInstanceId());
-            EventEntry<TopologyEvent> eventEntry =
-                    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
-                            topologyEvent);
-            events.add(eventEntry);
-        }
-
-        return events;
-    }
 }
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
index fee98b4..07ba2cf 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
@@ -72,11 +72,6 @@
     }
 
     @Override
-    public TopologyDiscoveryInterface getTopologyDiscoveryInterface() {
-        return topologyManager;
-    }
-
-    @Override
     public void addListener(ITopologyListener listener,
                             boolean startFromSnapshot) {
         topologyManager.addListener(listener, startFromSnapshot);
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
index b1ea740..56ca8fc 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
@@ -1,7 +1,10 @@
 package net.onrc.onos.core.topology;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -17,6 +20,9 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.core.util.SingletonTask;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
 import net.onrc.onos.core.hostmanager.Host;
 import net.onrc.onos.core.hostmanager.IHostListener;
 import net.onrc.onos.core.hostmanager.IHostService;
@@ -51,17 +57,58 @@
     private ILinkDiscoveryService linkDiscovery;
     private IControllerRegistryService registryService;
     private ITopologyService topologyService;
+    private IDatagridService datagridService;
 
     private IHostService hostService;
 
     private Topology topology;
-    private TopologyDiscoveryInterface topologyDiscoveryInterface;
 
     private static final String ENABLE_CLEANUP_PROPERTY = "EnableCleanup";
     private boolean cleanupEnabled = true;
     private static final int CLEANUP_TASK_INTERVAL = 60; // in seconds
     private SingletonTask cleanupTask;
 
+    private IEventChannel<byte[], TopologyEvent> eventChannel;
+
+    //
+    // Local state for keeping track of locally discovered events so we can
+    // cleanup properly when a Switch or Port is removed.
+    //
+    // We keep all Port, (incoming) Link and Host events per Switch DPID:
+    //  - If a switch goes down, we remove all corresponding Port, Link and
+    //    Host events.
+    //  - If a port on a switch goes down, we remove all corresponding Link
+    //    and Host events discovered by this instance.
+    //
+    // How to handle side-effect of remote events.
+    //  - Remote Port Down event -> Link Down
+    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
+    //  - Remote Host Added -> lose ownership of Host)
+    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
+    //
+    // XXX Domain knowledge based invariant maintenance should be moved to
+    //     driver module, since the invariant may be different on optical, etc.
+    //
+    // What happens on leadership change?
+    //  - Probably should: remove from discovered.. Maps, but not send DELETE
+    //    events
+    //    XXX Switch/Port can be rediscovered by new leader, but Link, Host?
+    //  - Current: There is no way to recognize leadership change?
+    //      ZookeeperRegistry.requestControl(long, ControlChangeCallback)
+    //      is the only way to register listener, and it allows only one
+    //      listener, which is already used by Controller class.
+    //
+    // FIXME Replace with concurrent variant.
+    //   #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
+    //
+    private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
+            new HashMap<>();
+    private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
+            new HashMap<>();
+    private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
+            new HashMap<>();
+
+
     /**
      * Cleanup old switches from the topology. Old switches are those which have
      * no controller in the registry.
@@ -132,8 +179,7 @@
                         HexString.toHexString(dpid));
 
                 SwitchEvent switchEvent = new SwitchEvent(new Dpid(dpid));
-                topologyDiscoveryInterface.
-                        removeSwitchDiscoveryEvent(switchEvent);
+                removeSwitchDiscoveryEvent(switchEvent);
                 registryService.releaseControl(dpid);
             }
         }
@@ -163,7 +209,7 @@
                     link.getDst(), linkEvent);
             return;
         }
-        topologyDiscoveryInterface.putLinkDiscoveryEvent(linkEvent);
+        putLinkDiscoveryEvent(linkEvent);
     }
 
     @Override
@@ -187,7 +233,7 @@
                     link.getDst(), linkEvent);
             return;
         }
-        topologyDiscoveryInterface.removeLinkDiscoveryEvent(linkEvent);
+        removeLinkDiscoveryEvent(linkEvent);
     }
 
     /* *****************
@@ -242,7 +288,7 @@
             portEvent.freeze();
             portEvents.add(portEvent);
         }
-        topologyDiscoveryInterface.putSwitchDiscoveryEvent(switchEvent, portEvents);
+        putSwitchDiscoveryEvent(switchEvent, portEvents);
 
         for (OFPortDesc port : sw.getPorts()) {
             // Allow links to be discovered on this port now that it's
@@ -287,7 +333,7 @@
         mastershipEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_ALL_LAYERS);
         mastershipEvent.freeze();
-        topologyDiscoveryInterface.removeSwitchMastershipEvent(mastershipEvent);
+        removeSwitchMastershipEvent(mastershipEvent);
     }
 
     @Override
@@ -330,7 +376,7 @@
         portEvent.freeze();
 
         if (registryService.hasControl(switchId)) {
-            topologyDiscoveryInterface.putPortDiscoveryEvent(portEvent);
+            putPortDiscoveryEvent(portEvent);
             linkDiscovery.enableDiscoveryOnPort(switchId,
                     port.getPortNo().getShortPortNumber());
         } else {
@@ -354,7 +400,7 @@
         portEvent.freeze();
 
         if (registryService.hasControl(switchId)) {
-            topologyDiscoveryInterface.removePortDiscoveryEvent(portEvent);
+            removePortDiscoveryEvent(portEvent);
         } else {
             log.debug("Not the master for switch {}. Suppressed port del event {}.",
                     dpid, portEvent);
@@ -390,6 +436,7 @@
         l.add(ILinkDiscoveryService.class);
         l.add(IThreadPoolService.class);
         l.add(IControllerRegistryService.class);
+        l.add(IDatagridService.class);
         l.add(ITopologyService.class);
         l.add(IHostService.class);
         return l;
@@ -401,6 +448,7 @@
         floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
         linkDiscovery = context.getServiceImpl(ILinkDiscoveryService.class);
         registryService = context.getServiceImpl(IControllerRegistryService.class);
+        datagridService = context.getServiceImpl(IDatagridService.class);
         hostService = context.getServiceImpl(IHostService.class);
 
         topologyService = context.getServiceImpl(ITopologyService.class);
@@ -412,9 +460,12 @@
         linkDiscovery.addListener(this);
         hostService.addHostListener(this);
 
+        eventChannel = datagridService.createChannel(
+                                TopologyManager.EVENT_CHANNEL_NAME,
+                                byte[].class,
+                                TopologyEvent.class);
+
         topology = topologyService.getTopology();
-        topologyDiscoveryInterface =
-                topologyService.getTopologyDiscoveryInterface();
 
         // Run the cleanup thread
         String enableCleanup =
@@ -449,7 +500,7 @@
         // Does not use vlan info now.
         event.freeze();
 
-        topologyDiscoveryInterface.putHostDiscoveryEvent(event);
+        putHostDiscoveryEvent(event);
     }
 
     @Override
@@ -458,7 +509,7 @@
         HostEvent event = new HostEvent(host.getMacAddress());
         // XXX shouldn't we be setting attachment points?
         event.freeze();
-        topologyDiscoveryInterface.removeHostDiscoveryEvent(event);
+        removeHostDiscoveryEvent(event);
     }
 
     private void controllerRoleChanged(Dpid dpid, Role role) {
@@ -473,6 +524,330 @@
         mastershipEvent.createStringAttribute(TopologyElement.TYPE,
                 TopologyElement.TYPE_ALL_LAYERS);
         mastershipEvent.freeze();
-        topologyDiscoveryInterface.putSwitchMastershipEvent(mastershipEvent);
+        putSwitchMastershipEvent(mastershipEvent);
+    }
+
+    /**
+     * Mastership updated event.
+     *
+     * @param mastershipEvent the mastership event.
+     */
+    private void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(mastershipEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+    }
+
+    /**
+     * Mastership removed event.
+     *
+     * @param mastershipEvent the mastership event.
+     */
+    private void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(mastershipEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.removeEntry(topologyEvent.getID());
+    }
+
+    /**
+     * Switch discovered event.
+     *
+     * @param switchEvent the switch event.
+     * @param portEvents  the corresponding port events for the switch.
+     */
+    private void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
+                                         Collection<PortEvent> portEvents) {
+        log.debug("Sending add switch: {}", switchEvent);
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(switchEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+        // Send out notification for each port
+        for (PortEvent portEvent : portEvents) {
+            log.debug("Sending add port: {}", portEvent);
+            topologyEvent =
+                new TopologyEvent(portEvent,
+                                  registryService.getOnosInstanceId());
+            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        }
+
+        //
+        // Keep track of the added ports
+        //
+        // Get the old Port Events
+        Map<ByteBuffer, PortEvent> oldPortEvents =
+            discoveredAddedPortEvents.get(switchEvent.getDpid());
+        if (oldPortEvents == null) {
+            oldPortEvents = new HashMap<>();
+        }
+
+        // Store the new Port Events in the local cache
+        Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
+        for (PortEvent portEvent : portEvents) {
+            ByteBuffer id = portEvent.getIDasByteBuffer();
+            newPortEvents.put(id, portEvent);
+        }
+        discoveredAddedPortEvents.put(switchEvent.getDpid(), newPortEvents);
+
+        //
+        // Extract the removed ports
+        //
+        List<PortEvent> removedPortEvents = new LinkedList<>();
+        for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
+            ByteBuffer key = entry.getKey();
+            PortEvent portEvent = entry.getValue();
+            if (!newPortEvents.containsKey(key)) {
+                removedPortEvents.add(portEvent);
+            }
+        }
+
+        // Cleanup old removed ports
+        for (PortEvent portEvent : removedPortEvents) {
+            removePortDiscoveryEvent(portEvent);
+        }
+    }
+
+    /**
+     * Switch removed event.
+     *
+     * @param switchEvent the switch event.
+     */
+    private void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
+        TopologyEvent topologyEvent;
+
+        // Get the old Port Events
+        Map<ByteBuffer, PortEvent> oldPortEvents =
+                discoveredAddedPortEvents.get(switchEvent.getDpid());
+        if (oldPortEvents == null) {
+            oldPortEvents = new HashMap<>();
+        }
+
+        log.debug("Sending remove switch: {}", switchEvent);
+        // Send out notification
+        topologyEvent =
+            new TopologyEvent(switchEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.removeEntry(topologyEvent.getID());
+
+        //
+        // Send out notification for each port.
+        //
+        // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
+        // because it will attempt to remove the port from the database,
+        // and the deactiveSwitch() call above already removed all ports.
+        //
+        for (PortEvent portEvent : oldPortEvents.values()) {
+            log.debug("Sending remove port:", portEvent);
+            topologyEvent =
+                new TopologyEvent(portEvent,
+                                  registryService.getOnosInstanceId());
+            eventChannel.removeEntry(topologyEvent.getID());
+        }
+        discoveredAddedPortEvents.remove(switchEvent.getDpid());
+
+        // Cleanup for each link
+        Map<ByteBuffer, LinkEvent> oldLinkEvents =
+            discoveredAddedLinkEvents.get(switchEvent.getDpid());
+        if (oldLinkEvents != null) {
+            for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+                removeLinkDiscoveryEvent(linkEvent);
+            }
+            discoveredAddedLinkEvents.remove(switchEvent.getDpid());
+        }
+
+        // Cleanup for each host
+        Map<ByteBuffer, HostEvent> oldHostEvents =
+            discoveredAddedHostEvents.get(switchEvent.getDpid());
+        if (oldHostEvents != null) {
+            for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
+                removeHostDiscoveryEvent(hostEvent);
+            }
+            discoveredAddedHostEvents.remove(switchEvent.getDpid());
+        }
+    }
+
+    /**
+     * Port discovered event.
+     *
+     * @param portEvent the port event.
+     */
+    private void putPortDiscoveryEvent(PortEvent portEvent) {
+        log.debug("Sending add port: {}", portEvent);
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(portEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+        // Store the new Port Event in the local cache
+        Map<ByteBuffer, PortEvent> oldPortEvents =
+            discoveredAddedPortEvents.get(portEvent.getDpid());
+        if (oldPortEvents == null) {
+            oldPortEvents = new HashMap<>();
+            discoveredAddedPortEvents.put(portEvent.getDpid(), oldPortEvents);
+        }
+        ByteBuffer id = portEvent.getIDasByteBuffer();
+        oldPortEvents.put(id, portEvent);
+    }
+
+    /**
+     * Port removed event.
+     *
+     * @param portEvent the port event.
+     */
+    private void removePortDiscoveryEvent(PortEvent portEvent) {
+        log.debug("Sending remove port: {}", portEvent);
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(portEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.removeEntry(topologyEvent.getID());
+
+        // Cleanup the Port Event from the local cache
+        Map<ByteBuffer, PortEvent> oldPortEvents =
+            discoveredAddedPortEvents.get(portEvent.getDpid());
+        if (oldPortEvents != null) {
+            ByteBuffer id = portEvent.getIDasByteBuffer();
+            oldPortEvents.remove(id);
+        }
+
+        // Cleanup for the incoming link
+        Map<ByteBuffer, LinkEvent> oldLinkEvents =
+            discoveredAddedLinkEvents.get(portEvent.getDpid());
+        if (oldLinkEvents != null) {
+            for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+                if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
+                    removeLinkDiscoveryEvent(linkEvent);
+                    // XXX If we change our model to allow multiple Link on
+                    // a Port, this loop must be fixed to allow continuing.
+                    break;
+                }
+            }
+        }
+
+        // Cleanup for the connected hosts
+        // TODO: The implementation below is probably wrong
+        List<HostEvent> removedHostEvents = new LinkedList<>();
+        Map<ByteBuffer, HostEvent> oldHostEvents =
+            discoveredAddedHostEvents.get(portEvent.getDpid());
+        if (oldHostEvents != null) {
+            for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
+                for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
+                    if (swp.equals(portEvent.getSwitchPort())) {
+                        removedHostEvents.add(hostEvent);
+                    }
+                }
+            }
+            for (HostEvent hostEvent : removedHostEvents) {
+                removeHostDiscoveryEvent(hostEvent);
+            }
+        }
+    }
+
+    /**
+     * Link discovered event.
+     *
+     * @param linkEvent the link event.
+     */
+    private void putLinkDiscoveryEvent(LinkEvent linkEvent) {
+        log.debug("Sending add link: {}", linkEvent);
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(linkEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+        // Store the new Link Event in the local cache
+        Map<ByteBuffer, LinkEvent> oldLinkEvents =
+            discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
+        if (oldLinkEvents == null) {
+            oldLinkEvents = new HashMap<>();
+            discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
+                                          oldLinkEvents);
+        }
+        ByteBuffer id = linkEvent.getIDasByteBuffer();
+        oldLinkEvents.put(id, linkEvent);
+    }
+
+    /**
+     * Link removed event.
+     *
+     * @param linkEvent the link event.
+     */
+    private void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
+        log.debug("Sending remove link: {}", linkEvent);
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(linkEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.removeEntry(topologyEvent.getID());
+
+        // Cleanup the Link Event from the local cache
+        Map<ByteBuffer, LinkEvent> oldLinkEvents =
+            discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
+        if (oldLinkEvents != null) {
+            ByteBuffer id = linkEvent.getIDasByteBuffer();
+            oldLinkEvents.remove(id);
+        }
+    }
+
+    /**
+     * Host discovered event.
+     *
+     * @param hostEvent the host event.
+     */
+    private void putHostDiscoveryEvent(HostEvent hostEvent) {
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(hostEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        log.debug("Put the host info into the cache of the topology. mac {}",
+                  hostEvent.getMac());
+
+        // Store the new Host Event in the local cache
+        // TODO: The implementation below is probably wrong
+        for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
+            Map<ByteBuffer, HostEvent> oldHostEvents =
+                discoveredAddedHostEvents.get(swp.getDpid());
+            if (oldHostEvents == null) {
+                oldHostEvents = new HashMap<>();
+                discoveredAddedHostEvents.put(swp.getDpid(), oldHostEvents);
+            }
+            ByteBuffer id = hostEvent.getIDasByteBuffer();
+            oldHostEvents.put(id, hostEvent);
+        }
+    }
+
+    /**
+     * Host removed event.
+     *
+     * @param hostEvent the host event.
+     */
+    private void removeHostDiscoveryEvent(HostEvent hostEvent) {
+        // Send out notification
+        TopologyEvent topologyEvent =
+            new TopologyEvent(hostEvent,
+                              registryService.getOnosInstanceId());
+        eventChannel.removeEntry(topologyEvent.getID());
+        log.debug("Remove the host info into the cache of the topology. mac {}",
+                  hostEvent.getMac());
+
+        // Cleanup the Host Event from the local cache
+        // TODO: The implementation below is probably wrong
+        ByteBuffer id = hostEvent.getIDasByteBuffer();
+        for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
+            Map<ByteBuffer, HostEvent> oldHostEvents =
+                discoveredAddedHostEvents.get(swp.getDpid());
+            if (oldHostEvents != null) {
+                oldHostEvents.remove(id);
+            }
+        }
     }
 }
diff --git a/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java b/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
index 2f8f377..f6e6c09 100644
--- a/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
+++ b/src/test/java/net/onrc/onos/core/topology/TopologyManagerTest.java
@@ -49,10 +49,10 @@
  * These test cases only check the sanity of functions in the TopologyManager.
  * Note that we do not test the eventHandler functions in the TopologyManager
  * class.
- * DatagridService, DataStoreService, eventChannel, and
- * controllerRegistryService are mocked out.
+ * DatagridService, eventChannel, and controllerRegistryService are mocked out.
  */
 public class TopologyManagerTest extends UnitTest {
+    private TopologyPublisher theTopologyPublisher;
     private TopologyManager theTopologyManager;
     private TopologyManager.EventHandler theEventHandler;
     private TopologyListenerTest theTopologyListener =
@@ -60,7 +60,6 @@
     private final String eventChannelName = "onos.topology";
     private IEventChannel<byte[], TopologyEvent> eventChannel;
     private IDatagridService datagridService;
-    private TopologyDatastore dataStoreService;
     private IControllerRegistryService registryService;
     private Collection<TopologyEvent> allTopologyEvents;
     private static final OnosInstanceId ONOS_INSTANCE_ID_1 =
@@ -94,7 +93,6 @@
     public void setUp() throws Exception {
         // Mock objects for testing
         datagridService = createNiceMock(IDatagridService.class);
-        dataStoreService = createNiceMock(TopologyDatastore.class);
         registryService = createMock(IControllerRegistryService.class);
         eventChannel = createNiceMock(IEventChannel.class);
 
@@ -111,40 +109,6 @@
                 eq(TopologyEvent.class)))
                 .andReturn(eventChannel).once();
 
-        expect(dataStoreService.addSwitch(
-                anyObject(SwitchEvent.class),
-                anyObject(Collection.class)))
-                .andReturn(true).anyTimes();
-
-        expect(dataStoreService.deactivateSwitch(
-                anyObject(SwitchEvent.class),
-                anyObject(Collection.class)))
-                .andReturn(true).anyTimes();
-
-        expect(dataStoreService.addPort(
-                anyObject(PortEvent.class)))
-                .andReturn(true).anyTimes();
-
-        expect(dataStoreService.deactivatePort(
-                anyObject(PortEvent.class)))
-                .andReturn(true).anyTimes();
-
-        expect(dataStoreService.addHost(
-                anyObject(HostEvent.class)))
-                .andReturn(true).anyTimes();
-
-        expect(dataStoreService.removeHost(
-                anyObject(HostEvent.class)))
-                .andReturn(true).anyTimes();
-
-        expect(dataStoreService.addLink(
-                anyObject(LinkEvent.class)))
-                .andReturn(true).anyTimes();
-
-        expect(dataStoreService.removeLink(
-                anyObject(LinkEvent.class)))
-                .andReturn(true).anyTimes();
-
         // Setup the Registry Service
         expect(registryService.getOnosInstanceId()).andReturn(ONOS_INSTANCE_ID_1).anyTimes();
         expect(registryService.getControllerForSwitch(DPID_1.value()))
@@ -158,11 +122,25 @@
 
         replay(datagridService);
         replay(registryService);
-        replay(dataStoreService);
         // replay(eventChannel);
     }
 
     /**
+     * Setup the Topology Publisher.
+     */
+    private void setupTopologyPublisher() {
+        // Create a TopologyPublisher object for testing
+        theTopologyPublisher = new TopologyPublisher();
+
+        // Setup the registry service
+        TestUtils.setField(theTopologyPublisher, "registryService",
+                           registryService);
+
+        // Setup the event channel
+        TestUtils.setField(theTopologyPublisher, "eventChannel", eventChannel);
+    }
+
+    /**
      * Setup the Topology Manager.
      */
     private void setupTopologyManager() {
@@ -173,9 +151,6 @@
         TestUtils.setField(theTopologyManager, "eventHandler",
             createNiceMock(TopologyManager.EventHandler.class));
         theTopologyManager.startup(datagridService);
-
-        // Replace the data store with a mocked object
-        TestUtils.setField(theTopologyManager, "datastore", dataStoreService);
     }
 
     /**
@@ -191,9 +166,6 @@
         TestUtils.setField(theTopologyManager, "eventHandler",
                            theEventHandler);
 
-        // Replace the data store with a mocked object
-        TestUtils.setField(theTopologyManager, "datastore", dataStoreService);
-
         replay(eventChannel);
         //
         // NOTE: Uncomment-out the line below if the startup() method needs
@@ -222,15 +194,17 @@
         expectLastCall().times(1, 1);          // 1 event
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         // Generate a new Switch Mastership event
         Role role = Role.MASTER;
         MastershipEvent mastershipEvent =
             new MastershipEvent(DPID_1, ONOS_INSTANCE_ID_1, role);
 
-        // Call the topologyManager function for adding the event
-        theTopologyManager.putSwitchMastershipEvent(mastershipEvent);
+        // Call the TopologyPublisher function for adding the event
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putSwitchMastershipEvent",
+                             MastershipEvent.class, mastershipEvent);
 
         // Verify the function calls
         verify(eventChannel);
@@ -246,15 +220,17 @@
         expectLastCall().times(1, 1);          // 1 event
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         // Generate a new Switch Mastership Event
         Role role = Role.MASTER;
         MastershipEvent mastershipEvent =
             new MastershipEvent(DPID_1, ONOS_INSTANCE_ID_1, role);
 
-        // Call the topologyManager function for removing the event
-        theTopologyManager.removeSwitchMastershipEvent(mastershipEvent);
+        // Call the TopologyPublisher function for removing the event
+        TestUtils.callMethod(theTopologyPublisher,
+                             "removeSwitchMastershipEvent",
+                             MastershipEvent.class, mastershipEvent);
 
         // Verify the function calls
         verify(eventChannel);
@@ -271,7 +247,7 @@
         expectLastCall().times(3, 3);  // (1 Switch + 1 Port), 1 Port
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         // Mock Switch has one Port
         PortNumber portNumber = PortNumber.uint32(1);
@@ -282,12 +258,18 @@
         Collection<PortEvent> portEvents = new ArrayList<PortEvent>();
         portEvents.add(new PortEvent(DPID_1, portNumber));
 
-        // Call the topologyManager function for adding a Switch
-        theTopologyManager.putSwitchDiscoveryEvent(switchEvent, portEvents);
+        // Call the TopologyPublisher function for adding a Switch
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putSwitchDiscoveryEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent, portEvents);
 
         for (PortEvent portEvent : portEvents) {
-            // Call the topologyManager function for adding a Port
-            theTopologyManager.putPortDiscoveryEvent(portEvent);
+            // Call the TopologyPublisher function for adding a Port
+            TestUtils.callMethod(theTopologyPublisher,
+                                 "putPortDiscoveryEvent",
+                                 PortEvent.class, portEvent);
         }
 
         // Verify the function calls
@@ -305,7 +287,7 @@
         expectLastCall().times(2, 2);          // 1 Switch, 1 Port
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         PortNumber portNumber = PortNumber.uint32(1);
 
@@ -313,14 +295,18 @@
         Collection<PortEvent> portEvents = new ArrayList<PortEvent>();
         portEvents.add(new PortEvent(DPID_1, portNumber));
 
-        // Call the topologyManager function for removing a Port
+        // Call the TopologyPublisher function for removing a Port
         for (PortEvent portEvent : portEvents) {
-            theTopologyManager.removePortDiscoveryEvent(portEvent);
+            TestUtils.callMethod(theTopologyPublisher,
+                                 "removePortDiscoveryEvent",
+                                 PortEvent.class, portEvent);
         }
 
-        // Call the topologyManager function for removing a Switch
+        // Call the TopologyPublisher function for removing a Switch
         SwitchEvent switchEvent = new SwitchEvent(DPID_1);
-        theTopologyManager.removeSwitchDiscoveryEvent(switchEvent);
+        TestUtils.callMethod(theTopologyPublisher,
+                             "removeSwitchDiscoveryEvent",
+                             SwitchEvent.class, switchEvent);
 
         // Verify the function calls
         verify(eventChannel);
@@ -338,7 +324,7 @@
         expectLastCall().times(5, 5);  // (2 Switch + 2 Port + 1 Link)
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         // Generate the Switch and Port Events
         PortNumber portNumber1 = PortNumber.uint32(1);
@@ -346,8 +332,12 @@
         Collection<PortEvent> portEvents1 = new ArrayList<PortEvent>();
         portEvents1.add(new PortEvent(DPID_1, portNumber1));
 
-        // Call the topologyManager function for adding a Switch
-        theTopologyManager.putSwitchDiscoveryEvent(switchEvent1, portEvents1);
+        // Call the TopologyPublisher function for adding a Switch
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putSwitchDiscoveryEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent1, portEvents1);
 
         // Generate the Switch and Port Events
         PortNumber portNumber2 = PortNumber.uint32(2);
@@ -355,14 +345,20 @@
         Collection<PortEvent> portEvents2 = new ArrayList<PortEvent>();
         portEvents2.add(new PortEvent(DPID_2, portNumber2));
 
-        // Call the topologyManager function for adding a Switch
-        theTopologyManager.putSwitchDiscoveryEvent(switchEvent2, portEvents2);
+        // Call the TopologyPublisher function for adding a Switch
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putSwitchDiscoveryEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent2, portEvents2);
 
         // Create the Link Event
         LinkEvent linkEvent =
             new LinkEvent(new SwitchPort(DPID_1, portNumber1),
                           new SwitchPort(DPID_2, portNumber2));
-        theTopologyManager.putLinkDiscoveryEvent(linkEvent);
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putLinkDiscoveryEvent",
+                             LinkEvent.class, linkEvent);
 
         // Verify the function calls
         verify(eventChannel);
@@ -378,7 +374,7 @@
         expectLastCall().times(1, 1);          // (1 Link)
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         // Generate the Switch and Port Events
         PortNumber portNumber1 = PortNumber.uint32(1);
@@ -386,8 +382,12 @@
         Collection<PortEvent> portEvents1 = new ArrayList<PortEvent>();
         portEvents1.add(new PortEvent(DPID_1, portNumber1));
 
-        // Call the topologyManager function for adding a Switch
-        theTopologyManager.putSwitchDiscoveryEvent(switchEvent1, portEvents1);
+        // Call the TopologyPublisher function for adding a Switch
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putSwitchDiscoveryEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent1, portEvents1);
 
         // Generate the Switch and Port Events
         PortNumber portNumber2 = PortNumber.uint32(2);
@@ -395,14 +395,20 @@
         Collection<PortEvent> portEvents2 = new ArrayList<PortEvent>();
         portEvents2.add(new PortEvent(DPID_2, portNumber2));
 
-        // Call the topologyManager function for adding a Switch
-        theTopologyManager.putSwitchDiscoveryEvent(switchEvent2, portEvents2);
+        // Call the TopologyPublisher function for adding a Switch
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putSwitchDiscoveryEvent",
+                             new Class<?>[] {SwitchEvent.class,
+                                     Collection.class},
+                             switchEvent2, portEvents2);
 
         // Remove the Link
         LinkEvent linkEventRemove =
             new LinkEvent(new SwitchPort(DPID_1, portNumber1),
                           new SwitchPort(DPID_2, portNumber2));
-        theTopologyManager.removeLinkDiscoveryEvent(linkEventRemove);
+        TestUtils.callMethod(theTopologyPublisher,
+                             "removeLinkDiscoveryEvent",
+                             LinkEvent.class, linkEventRemove);
 
         // Verify the function calls
         verify(eventChannel);
@@ -419,7 +425,7 @@
         expectLastCall().times(1, 1);          // 1 Host
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         // Generate a new Host Event
         PortNumber portNumber = PortNumber.uint32(1);
@@ -430,8 +436,10 @@
         HostEvent hostEvent = new HostEvent(hostMac);
         hostEvent.setAttachmentPoints(spLists);
 
-        // Call the topologyManager function for adding a Host
-        theTopologyManager.putHostDiscoveryEvent(hostEvent);
+        // Call the TopologyPublisher function for adding a Host
+        TestUtils.callMethod(theTopologyPublisher,
+                             "putHostDiscoveryEvent",
+                             HostEvent.class, hostEvent);
 
         // Verify the function calls
         verify(eventChannel);
@@ -447,7 +455,7 @@
         expectLastCall().times(1, 1);          // 1 Host
         replay(eventChannel);
 
-        setupTopologyManager();
+        setupTopologyPublisher();
 
         // Generate a new Host Event
         PortNumber portNumber = PortNumber.uint32(1);
@@ -458,8 +466,10 @@
         HostEvent hostEvent = new HostEvent(hostMac);
         hostEvent.setAttachmentPoints(spLists);
 
-        // Call the topologyManager function for removing a Host
-        theTopologyManager.removeHostDiscoveryEvent(hostEvent);
+        // Call the TopologyPublisher function for removing a Host
+        TestUtils.callMethod(theTopologyPublisher,
+                             "removeHostDiscoveryEvent",
+                             HostEvent.class, hostEvent);
 
         // Verify the function calls
         verify(eventChannel);