Work toward cleaning up the Topology Manager and the Topology Publisher

ONOS-1890

Moved some of the methods in class TopologyManager to TopologyPublisher,
because naturally they belong to the latter.
This is also needed for the log-based mechanism.
In the process, removed the event writing to the datastore, until
it becomes clear when, where and what to write.

Change-Id: Ic1fe1db533aec66a91bf643b0989119f33c3d37e
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
index 1280cf4..daa334b 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -22,17 +22,12 @@
 import net.onrc.onos.core.datagrid.IDatagridService;
 import net.onrc.onos.core.datagrid.IEventChannel;
 import net.onrc.onos.core.datagrid.IEventChannelListener;
-import net.onrc.onos.core.datastore.topology.KVDevice;
-import net.onrc.onos.core.datastore.topology.KVLink;
-import net.onrc.onos.core.datastore.topology.KVPort;
-import net.onrc.onos.core.datastore.topology.KVSwitch;
 import net.onrc.onos.core.metrics.OnosMetrics;
 import net.onrc.onos.core.metrics.OnosMetrics.MetricsComponent;
 import net.onrc.onos.core.metrics.OnosMetrics.MetricsFeature;
 import net.onrc.onos.core.registry.IControllerRegistryService;
 import net.onrc.onos.core.util.Dpid;
 import net.onrc.onos.core.util.EventEntry;
-import net.onrc.onos.core.util.PortNumber;
 import net.onrc.onos.core.util.SwitchPort;
 import net.onrc.onos.core.util.serializers.KryoFactory;
 
@@ -58,7 +53,7 @@
  * TODO TBD: This class may delay the requested change to handle event
  * re-ordering. e.g.) Link Add came in, but Switch was not there.
  */
-public class TopologyManager implements TopologyDiscoveryInterface {
+public class TopologyManager {
 
     private static final Logger log = LoggerFactory
             .getLogger(TopologyManager.class);
@@ -67,9 +62,7 @@
     public static final String EVENT_CHANNEL_NAME = "onos.topology";
     private EventHandler eventHandler = new EventHandler();
 
-    private TopologyDatastore datastore;
     private final TopologyImpl topology = new TopologyImpl();
-    private final IControllerRegistryService registryService;
     private Kryo kryo = KryoFactory.newKryoObject();
     private TopologyEventPreprocessor eventPreprocessor;
     private CopyOnWriteArrayList<ITopologyListener> topologyListeners =
@@ -104,44 +97,6 @@
                                 "ListenerEventRate");
 
     //
-    // Local state for keeping track of locally discovered events so we can
-    // cleanup properly when a Switch or Port is removed.
-    //
-    // We keep all Port, (incoming) Link and Host events per Switch DPID:
-    //  - If a switch goes down, we remove all corresponding Port, Link and
-    //    Host events.
-    //  - If a port on a switch goes down, we remove all corresponding Link
-    //    and Host events discovered by this instance.
-    //
-    // How to handle side-effect of remote events.
-    //  - Remote Port Down event -> Link Down
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
-    //  - Remote Host Added -> lose ownership of Host)
-    //      Not handled. (XXX Shouldn't it be removed from discovered.. Map)
-    //
-    // XXX Domain knowledge based invariant maintenance should be moved to
-    //     driver module, since the invariant may be different on optical, etc.
-    //
-    // What happens on leadership change?
-    //  - Probably should: remove from discovered.. Maps, but not send DELETE
-    //    events
-    //    XXX Switch/Port can be rediscovered by new leader, but Link, Host?
-    //  - Current: There is no way to recognize leadership change?
-    //      ZookeeperRegistry.requestControl(long, ControlChangeCallback)
-    //      is the only way to register listener, and it allows only one
-    //      listener, which is already used by Controller class.
-    //
-    // FIXME Replace with concurrent variant.
-    //   #removeSwitchDiscoveryEvent(SwitchEvent) runs in different thread.
-    //
-    private Map<Dpid, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
-            new HashMap<>();
-    private Map<Dpid, Map<ByteBuffer, HostEvent>> discoveredAddedHostEvents =
-            new HashMap<>();
-
-    //
     // Local state for keeping the last ADD Mastership Event entries.
     // TODO: In the future, we might have to keep this state somewhere else.
     //
@@ -173,8 +128,6 @@
      * @param registryService the Registry Service to use.
      */
     public TopologyManager(IControllerRegistryService registryService) {
-        datastore = new TopologyDatastore();
-        this.registryService = registryService;
         this.eventPreprocessor =
             new TopologyEventPreprocessor(registryService);
     }
@@ -201,12 +154,7 @@
          */
         private void startup() {
             //
-            // TODO: Read all state from the database:
-            //
-            // Collection<EventEntry<TopologyEvent>> collection =
-            //    readWholeTopologyFromDB();
-            //
-            // For now, as a shortcut we read it from the datagrid
+            // Read all topology state
             //
             Collection<TopologyEvent> allTopologyEvents =
                     eventChannel.getAllEntries();
@@ -611,359 +559,6 @@
         apiRemovedHostEvents.clear();
     }
 
-    /**
-     * Mastership updated event.
-     *
-     * @param mastershipEvent the mastership event.
-     */
-    @Override
-    public void putSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-    }
-
-    /**
-     * Mastership removed event.
-     *
-     * @param mastershipEvent the mastership event.
-     */
-    @Override
-    public void removeSwitchMastershipEvent(MastershipEvent mastershipEvent) {
-        // Send out notification
-        TopologyEvent topologyEvent =
-            new TopologyEvent(mastershipEvent,
-                              registryService.getOnosInstanceId());
-        eventChannel.removeEntry(topologyEvent.getID());
-    }
-
-    /**
-     * Switch discovered event.
-     *
-     * @param switchEvent the switch event.
-     * @param portEvents  the corresponding port events for the switch.
-     */
-    @Override
-    public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
-                                        Collection<PortEvent> portEvents) {
-        if (datastore.addSwitch(switchEvent, portEvents)) {
-            log.debug("Sending add switch: {}", switchEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(switchEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-            // Send out notification for each port
-            for (PortEvent portEvent : portEvents) {
-                log.debug("Sending add port: {}", portEvent);
-                topologyEvent =
-                    new TopologyEvent(portEvent,
-                                      registryService.getOnosInstanceId());
-                eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-            }
-
-            //
-            // Keep track of the added ports
-            //
-            // Get the old Port Events
-            Map<ByteBuffer, PortEvent> oldPortEvents =
-                    discoveredAddedPortEvents.get(switchEvent.getDpid());
-            if (oldPortEvents == null) {
-                oldPortEvents = new HashMap<>();
-            }
-
-            // Store the new Port Events in the local cache
-            Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
-            for (PortEvent portEvent : portEvents) {
-                ByteBuffer id = portEvent.getIDasByteBuffer();
-                newPortEvents.put(id, portEvent);
-            }
-            discoveredAddedPortEvents.put(switchEvent.getDpid(),
-                    newPortEvents);
-
-            //
-            // Extract the removed ports
-            //
-            List<PortEvent> removedPortEvents = new LinkedList<>();
-            for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
-                ByteBuffer key = entry.getKey();
-                PortEvent portEvent = entry.getValue();
-                if (!newPortEvents.containsKey(key)) {
-                    removedPortEvents.add(portEvent);
-                }
-            }
-
-            // Cleanup old removed ports
-            for (PortEvent portEvent : removedPortEvents) {
-                removePortDiscoveryEvent(portEvent);
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     * <p/>
-     * Called by {@link TopologyPublisher.SwitchCleanup} thread.
-     */
-    @Override
-    public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
-        TopologyEvent topologyEvent;
-
-        // Get the old Port Events
-        Map<ByteBuffer, PortEvent> oldPortEvents =
-                discoveredAddedPortEvents.get(switchEvent.getDpid());
-        if (oldPortEvents == null) {
-            oldPortEvents = new HashMap<>();
-        }
-
-        if (datastore.deactivateSwitch(switchEvent, oldPortEvents.values())) {
-            log.debug("Sending remove switch: {}", switchEvent);
-            // Send out notification
-            topologyEvent =
-                new TopologyEvent(switchEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-
-            //
-            // Send out notification for each port.
-            //
-            // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
-            // because it will attempt to remove the port from the database,
-            // and the deactiveSwitch() call above already removed all ports.
-            //
-            for (PortEvent portEvent : oldPortEvents.values()) {
-                log.debug("Sending remove port:", portEvent);
-                topologyEvent =
-                    new TopologyEvent(portEvent,
-                                      registryService.getOnosInstanceId());
-                eventChannel.removeEntry(topologyEvent.getID());
-            }
-            discoveredAddedPortEvents.remove(switchEvent.getDpid());
-
-            // Cleanup for each link
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(switchEvent.getDpid());
-            if (oldLinkEvents != null) {
-                for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
-                    removeLinkDiscoveryEvent(linkEvent);
-                }
-                discoveredAddedLinkEvents.remove(switchEvent.getDpid());
-            }
-
-            // Cleanup for each host
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                    discoveredAddedHostEvents.get(switchEvent.getDpid());
-            if (oldHostEvents != null) {
-                for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
-                    removeHostDiscoveryEvent(hostEvent);
-                }
-                discoveredAddedHostEvents.remove(switchEvent.getDpid());
-            }
-        }
-    }
-
-    /**
-     * Port discovered event.
-     *
-     * @param portEvent the port event.
-     */
-    @Override
-    public void putPortDiscoveryEvent(PortEvent portEvent) {
-        if (datastore.addPort(portEvent)) {
-            log.debug("Sending add port: {}", portEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-            // Store the new Port Event in the local cache
-            Map<ByteBuffer, PortEvent> oldPortEvents =
-                    discoveredAddedPortEvents.get(portEvent.getDpid());
-            if (oldPortEvents == null) {
-                oldPortEvents = new HashMap<>();
-                discoveredAddedPortEvents.put(portEvent.getDpid(),
-                        oldPortEvents);
-            }
-            ByteBuffer id = portEvent.getIDasByteBuffer();
-            oldPortEvents.put(id, portEvent);
-        }
-    }
-
-    /**
-     * Port removed event.
-     *
-     * @param portEvent the port event.
-     */
-    @Override
-    public void removePortDiscoveryEvent(PortEvent portEvent) {
-        if (datastore.deactivatePort(portEvent)) {
-            log.debug("Sending remove port: {}", portEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-
-            // Cleanup the Port Event from the local cache
-            Map<ByteBuffer, PortEvent> oldPortEvents =
-                    discoveredAddedPortEvents.get(portEvent.getDpid());
-            if (oldPortEvents != null) {
-                ByteBuffer id = portEvent.getIDasByteBuffer();
-                oldPortEvents.remove(id);
-            }
-
-            // Cleanup for the incoming link
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(portEvent.getDpid());
-            if (oldLinkEvents != null) {
-                for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
-                    if (linkEvent.getDst().equals(portEvent.getSwitchPort())) {
-                        removeLinkDiscoveryEvent(linkEvent);
-                        // XXX If we change our model to allow multiple Link on
-                        // a Port, this loop must be fixed to allow continuing.
-                        break;
-                    }
-                }
-            }
-
-            // Cleanup for the connected hosts
-            // TODO: The implementation below is probably wrong
-            List<HostEvent> removedHostEvents = new LinkedList<>();
-            Map<ByteBuffer, HostEvent> oldHostEvents =
-                    discoveredAddedHostEvents.get(portEvent.getDpid());
-            if (oldHostEvents != null) {
-                for (HostEvent hostEvent : new ArrayList<>(oldHostEvents.values())) {
-                    for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-                        if (swp.equals(portEvent.getSwitchPort())) {
-                            removedHostEvents.add(hostEvent);
-                        }
-                    }
-                }
-                for (HostEvent hostEvent : removedHostEvents) {
-                    removeHostDiscoveryEvent(hostEvent);
-                }
-            }
-        }
-    }
-
-    /**
-     * Link discovered event.
-     *
-     * @param linkEvent the link event.
-     */
-    @Override
-    public void putLinkDiscoveryEvent(LinkEvent linkEvent) {
-        if (datastore.addLink(linkEvent)) {
-            log.debug("Sending add link: {}", linkEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(linkEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-
-            // Store the new Link Event in the local cache
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-            if (oldLinkEvents == null) {
-                oldLinkEvents = new HashMap<>();
-                discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
-                        oldLinkEvents);
-            }
-            ByteBuffer id = linkEvent.getIDasByteBuffer();
-            oldLinkEvents.put(id, linkEvent);
-        }
-    }
-
-    /**
-     * Link removed event.
-     *
-     * @param linkEvent the link event.
-     */
-    @Override
-    public void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
-        if (datastore.removeLink(linkEvent)) {
-            log.debug("Sending remove link: {}", linkEvent);
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(linkEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-
-            // Cleanup the Link Event from the local cache
-            Map<ByteBuffer, LinkEvent> oldLinkEvents =
-                    discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
-            if (oldLinkEvents != null) {
-                ByteBuffer id = linkEvent.getIDasByteBuffer();
-                oldLinkEvents.remove(id);
-            }
-        }
-    }
-
-    /**
-     * Host discovered event.
-     *
-     * @param hostEvent the host event.
-     */
-    @Override
-    public void putHostDiscoveryEvent(HostEvent hostEvent) {
-        if (datastore.addHost(hostEvent)) {
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(hostEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-            log.debug("Put the host info into the cache of the topology. mac {}",
-                      hostEvent.getMac());
-
-            // Store the new Host Event in the local cache
-            // TODO: The implementation below is probably wrong
-            for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-                Map<ByteBuffer, HostEvent> oldHostEvents =
-                        discoveredAddedHostEvents.get(swp.getDpid());
-                if (oldHostEvents == null) {
-                    oldHostEvents = new HashMap<>();
-                    discoveredAddedHostEvents.put(swp.getDpid(),
-                            oldHostEvents);
-                }
-                ByteBuffer id = hostEvent.getIDasByteBuffer();
-                oldHostEvents.put(id, hostEvent);
-            }
-        }
-    }
-
-    /**
-     * Host removed event.
-     *
-     * @param hostEvent the host event.
-     */
-    @Override
-    public void removeHostDiscoveryEvent(HostEvent hostEvent) {
-        if (datastore.removeHost(hostEvent)) {
-            // Send out notification
-            TopologyEvent topologyEvent =
-                new TopologyEvent(hostEvent,
-                                  registryService.getOnosInstanceId());
-            eventChannel.removeEntry(topologyEvent.getID());
-            log.debug("Remove the host info into the cache of the topology. mac {}",
-                      hostEvent.getMac());
-
-            // Cleanup the Host Event from the local cache
-            // TODO: The implementation below is probably wrong
-            ByteBuffer id = hostEvent.getIDasByteBuffer();
-            for (SwitchPort swp : hostEvent.getAttachmentPoints()) {
-                Map<ByteBuffer, HostEvent> oldHostEvents =
-                        discoveredAddedHostEvents.get(swp.getDpid());
-                if (oldHostEvents != null) {
-                    oldHostEvents.remove(id);
-                }
-            }
-        }
-    }
-
     //
     // Methods to update topology replica
     //
@@ -1390,96 +985,4 @@
         topology.removeHost(mac);
         apiRemovedHostEvents.add(hostInTopo);
     }
-
-    /**
-     * Read the whole topology from the database.
-     *
-     * @return a list of EventEntry-encapsulated Topology Events for
-     * the whole topology.
-     */
-    private List<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
-        List<EventEntry<TopologyEvent>> events =
-                new LinkedList<EventEntry<TopologyEvent>>();
-
-        // XXX May need to clear whole topology first, depending on
-        // how we initially subscribe to replication events
-
-        // Add all active switches
-        for (KVSwitch sw : KVSwitch.getAllSwitches()) {
-            if (sw.getStatus() != KVSwitch.STATUS.ACTIVE) {
-                continue;
-            }
-
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            SwitchEvent switchEvent = new SwitchEvent(new Dpid(sw.getDpid()));
-            TopologyEvent topologyEvent =
-                new TopologyEvent(switchEvent,
-                                  registryService.getOnosInstanceId());
-            EventEntry<TopologyEvent> eventEntry =
-                    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
-                            topologyEvent);
-            events.add(eventEntry);
-        }
-
-        // Add all active ports
-        for (KVPort p : KVPort.getAllPorts()) {
-            if (p.getStatus() != KVPort.STATUS.ACTIVE) {
-                continue;
-            }
-
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            PortEvent portEvent =
-                new PortEvent(new Dpid(p.getDpid()),
-                              new PortNumber(p.getNumber().shortValue()));
-            TopologyEvent topologyEvent =
-                new TopologyEvent(portEvent,
-                                  registryService.getOnosInstanceId());
-            EventEntry<TopologyEvent> eventEntry =
-                    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
-                            topologyEvent);
-            events.add(eventEntry);
-        }
-
-        for (KVDevice d : KVDevice.getAllDevices()) {
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            HostEvent devEvent = new HostEvent(MACAddress.valueOf(d.getMac()));
-            for (byte[] portId : d.getAllPortIds()) {
-                devEvent.addAttachmentPoint(
-                        new SwitchPort(KVPort.getDpidFromKey(portId),
-                        KVPort.getNumberFromKey(portId)));
-            }
-        }
-
-        for (KVLink l : KVLink.getAllLinks()) {
-            //
-            // TODO: Using the local ONOS Instance ID below is incorrect.
-            // Currently, this code is not used, and it might go away in the
-            // future.
-            //
-            LinkEvent linkEvent = new LinkEvent(
-                        new SwitchPort(l.getSrc().dpid, l.getSrc().number),
-                        new SwitchPort(l.getDst().dpid, l.getDst().number));
-            TopologyEvent topologyEvent =
-                new TopologyEvent(linkEvent,
-                                  registryService.getOnosInstanceId());
-            EventEntry<TopologyEvent> eventEntry =
-                    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
-                            topologyEvent);
-            events.add(eventEntry);
-        }
-
-        return events;
-    }
 }