Renamed networkgraph package to topology.
Moved NetworkGraphPublisher into new topology package.

net.onrc.onos.ofcontroller.networkgraph.* => net.onrc.onos.core.topology.*
net.onrc.onos.ofcontroller.floodlightlistener.NetworkGraphPublisher => net.onrc.onos.core.topology.NetworkGraphPublisher

Change-Id: I8b156d0fcbba520fee61e92ab659bb02cfa704ac
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyManager.java b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
new file mode 100644
index 0000000..9699c6c
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyManager.java
@@ -0,0 +1,1199 @@
+package net.onrc.onos.core.topology;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
+import net.onrc.onos.core.datagrid.IEventChannelListener;
+import net.onrc.onos.core.datastore.topology.KVLink;
+import net.onrc.onos.core.datastore.topology.KVPort;
+import net.onrc.onos.core.datastore.topology.KVSwitch;
+import net.onrc.onos.core.topology.PortEvent.SwitchPort;
+import net.onrc.onos.core.util.EventEntry;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The "NB" read-only Network Map.
+ *
+ * - Maintain Invariant/Relationships between Topology Objects.
+ *
+ * TODO To be synchronized based on TopologyEvent Notification.
+ *
+ * TODO TBD: Caller is expected to maintain parent/child calling order. Parent
+ * Object must exist before adding sub component(Add Switch -> Port).
+ *
+ * TODO TBD: This class may delay the requested change to handle event
+ * re-ordering. e.g.) Link Add came in, but Switch was not there.
+ *
+ */
+public class TopologyManager implements NetworkGraphDiscoveryInterface {
+
+    private static final Logger log = LoggerFactory
+	    .getLogger(TopologyManager.class);
+
+    private IEventChannel<byte[], TopologyEvent> eventChannel;
+    public static final String EVENT_CHANNEL_NAME = "onos.topology";
+    private EventHandler eventHandler = new EventHandler();
+
+    private final NetworkGraphDatastore datastore;
+    private final NetworkGraphImpl networkGraph = new NetworkGraphImpl();
+    private final IControllerRegistryService registryService;
+    private CopyOnWriteArrayList<INetworkGraphListener> networkGraphListeners;
+
+    //
+    // Local state for keeping track of reordered events.
+    // NOTE: Switch Events are not affected by the event reordering.
+    //
+    private Map<ByteBuffer, PortEvent> reorderedAddedPortEvents =
+	new HashMap<ByteBuffer, PortEvent>();
+    private Map<ByteBuffer, LinkEvent> reorderedAddedLinkEvents =
+	new HashMap<ByteBuffer, LinkEvent>();
+    private Map<ByteBuffer, DeviceEvent> reorderedAddedDeviceEvents =
+	new HashMap<ByteBuffer, DeviceEvent>();
+
+    //
+    // Local state for keeping track of locally discovered events so we can
+    // cleanup properly when a Switch or Port is removed.
+    //
+    // We keep all Port, Link and Device events per Switch DPID:
+    //  - If a switch goes down, we remove all corresponding Port, Link and
+    //    Device events.
+    //  - If a port on a switch goes down, we remove all corresponding Link
+    //    and Device events.
+    //
+    private Map<Long, Map<ByteBuffer, PortEvent>> discoveredAddedPortEvents =
+	new HashMap<>();
+    private Map<Long, Map<ByteBuffer, LinkEvent>> discoveredAddedLinkEvents =
+	new HashMap<>();
+    private Map<Long, Map<ByteBuffer, DeviceEvent>> discoveredAddedDeviceEvents =
+	new HashMap<>();
+
+    //
+    // Local state for keeping track of the application event notifications
+    //
+    List<SwitchEvent> apiAddedSwitchEvents = new LinkedList<SwitchEvent>();
+    List<SwitchEvent> apiRemovedSwitchEvents = new LinkedList<SwitchEvent>();
+    List<PortEvent> apiAddedPortEvents = new LinkedList<PortEvent>();
+    List<PortEvent> apiRemovedPortEvents = new LinkedList<PortEvent>();
+    List<LinkEvent> apiAddedLinkEvents = new LinkedList<LinkEvent>();
+    List<LinkEvent> apiRemovedLinkEvents = new LinkedList<LinkEvent>();
+    List<DeviceEvent> apiAddedDeviceEvents = new LinkedList<DeviceEvent>();
+    List<DeviceEvent> apiRemovedDeviceEvents = new LinkedList<DeviceEvent>();
+
+    /**
+     * Constructor.
+     *
+     * @param registryService the Registry Service to use.
+     * @param networkGraphListeners the collection of Network Graph Listeners
+     * to use.
+     */
+    public TopologyManager(IControllerRegistryService registryService,
+			   CopyOnWriteArrayList<INetworkGraphListener> networkGraphListeners) {
+	datastore = new NetworkGraphDatastore();
+	this.registryService = registryService;
+	this.networkGraphListeners = networkGraphListeners;
+    }
+
+    /**
+     * Get the Network Graph.
+     *
+     * @return the Network Graph.
+     */
+    NetworkGraph getNetworkGraph() {
+	return networkGraph;
+    }
+
+    /**
+     * Event handler class.
+     */
+    private class EventHandler extends Thread implements
+	IEventChannelListener<byte[], TopologyEvent> {
+	private BlockingQueue<EventEntry<TopologyEvent>> topologyEvents =
+	    new LinkedBlockingQueue<EventEntry<TopologyEvent>>();
+
+	/**
+	 * Startup processing.
+	 */
+	private void startup() {
+	    //
+	    // TODO: Read all state from the database:
+	    //
+	    // Collection<EventEntry<TopologyEvent>> collection =
+	    //    readWholeTopologyFromDB();
+	    //
+	    // For now, as a shortcut we read it from the datagrid
+	    //
+	    Collection<TopologyEvent> topologyEvents =
+		eventChannel.getAllEntries();
+	    Collection<EventEntry<TopologyEvent>> collection =
+		new LinkedList<EventEntry<TopologyEvent>>();
+
+	    for (TopologyEvent topologyEvent : topologyEvents) {
+		EventEntry<TopologyEvent> eventEntry =
+		    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+						  topologyEvent);
+		collection.add(eventEntry);
+	    }
+	    processEvents(collection);
+	}
+
+	/**
+	 * Run the thread.
+	 */
+	@Override
+	public void run() {
+	    Collection<EventEntry<TopologyEvent>> collection =
+		new LinkedList<EventEntry<TopologyEvent>>();
+
+	    this.setName("TopologyManager.EventHandler " + this.getId());
+	    startup();
+
+	    //
+	    // The main loop
+	    //
+	    try {
+		while (true) {
+		    EventEntry<TopologyEvent> eventEntry = topologyEvents.take();
+		    collection.add(eventEntry);
+		    topologyEvents.drainTo(collection);
+
+		    processEvents(collection);
+		    collection.clear();
+		}
+	    } catch (Exception exception) {
+		log.debug("Exception processing Topology Events: ", exception);
+	    }
+	}
+
+	/**
+	 * Process all topology events.
+	 *
+	 * @param events the events to process.
+	 */
+	private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
+	    // Local state for computing the final set of events
+	    Map<ByteBuffer, SwitchEvent> addedSwitchEvents = new HashMap<>();
+	    Map<ByteBuffer, SwitchEvent> removedSwitchEvents = new HashMap<>();
+	    Map<ByteBuffer, PortEvent> addedPortEvents = new HashMap<>();
+	    Map<ByteBuffer, PortEvent> removedPortEvents = new HashMap<>();
+	    Map<ByteBuffer, LinkEvent> addedLinkEvents = new HashMap<>();
+	    Map<ByteBuffer, LinkEvent> removedLinkEvents = new HashMap<>();
+	    Map<ByteBuffer, DeviceEvent> addedDeviceEvents = new HashMap<>();
+	    Map<ByteBuffer, DeviceEvent> removedDeviceEvents = new HashMap<>();
+
+	    //
+	    // Classify and suppress matching events
+	    //
+	    for (EventEntry<TopologyEvent> event : events) {
+		TopologyEvent topologyEvent = event.eventData();
+		SwitchEvent switchEvent = topologyEvent.switchEvent;
+		PortEvent portEvent = topologyEvent.portEvent;
+		LinkEvent linkEvent = topologyEvent.linkEvent;
+		DeviceEvent deviceEvent = topologyEvent.deviceEvent;
+
+		//
+		// Extract the events
+		//
+		switch (event.eventType()) {
+		case ENTRY_ADD:
+		    log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
+		    if (switchEvent != null) {
+			ByteBuffer id = switchEvent.getIDasByteBuffer();
+			addedSwitchEvents.put(id, switchEvent);
+			removedSwitchEvents.remove(id);
+			// Switch Events are not affected by event reordering
+		    }
+		    if (portEvent != null) {
+			ByteBuffer id = portEvent.getIDasByteBuffer();
+			addedPortEvents.put(id, portEvent);
+			removedPortEvents.remove(id);
+			reorderedAddedPortEvents.remove(id);
+		    }
+		    if (linkEvent != null) {
+			ByteBuffer id = linkEvent.getIDasByteBuffer();
+			addedLinkEvents.put(id, linkEvent);
+			removedLinkEvents.remove(id);
+			reorderedAddedLinkEvents.remove(id);
+		    }
+		    if (deviceEvent != null) {
+			ByteBuffer id = deviceEvent.getIDasByteBuffer();
+			addedDeviceEvents.put(id, deviceEvent);
+			removedDeviceEvents.remove(id);
+			reorderedAddedDeviceEvents.remove(id);
+		    }
+		    break;
+		case ENTRY_REMOVE:
+		    log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
+		    if (switchEvent != null) {
+			ByteBuffer id = switchEvent.getIDasByteBuffer();
+			addedSwitchEvents.remove(id);
+			removedSwitchEvents.put(id, switchEvent);
+			// Switch Events are not affected by event reordering
+		    }
+		    if (portEvent != null) {
+			ByteBuffer id = portEvent.getIDasByteBuffer();
+			addedPortEvents.remove(id);
+			removedPortEvents.put(id, portEvent);
+			reorderedAddedPortEvents.remove(id);
+		    }
+		    if (linkEvent != null) {
+			ByteBuffer id = linkEvent.getIDasByteBuffer();
+			addedLinkEvents.remove(id);
+			removedLinkEvents.put(id, linkEvent);
+			reorderedAddedLinkEvents.remove(id);
+		    }
+		    if (deviceEvent != null) {
+			ByteBuffer id = deviceEvent.getIDasByteBuffer();
+			addedDeviceEvents.remove(id);
+			removedDeviceEvents.put(id, deviceEvent);
+			reorderedAddedDeviceEvents.remove(id);
+		    }
+		    break;
+		}
+	    }
+
+	    //
+	    // Lock the Network Graph while it is modified
+	    //
+	    networkGraph.acquireWriteLock();
+
+	    try {
+    	    	//
+		// Apply the classified events.
+		//
+		// Apply the "add" events in the proper order:
+		//   switch, port, link, device
+		//
+    	    	for (SwitchEvent switchEvent : addedSwitchEvents.values())
+    	    	    addSwitch(switchEvent);
+    	    	for (PortEvent portEvent : addedPortEvents.values())
+    	    	    addPort(portEvent);
+    	    	for (LinkEvent linkEvent : addedLinkEvents.values())
+    	    	    addLink(linkEvent);
+    	    	for (DeviceEvent deviceEvent : addedDeviceEvents.values())
+    	    	    addDevice(deviceEvent);
+    	    	//
+    	    	// Apply the "remove" events in the reverse order:
+    	    	//   device, link, port, switch
+    	    	//
+    	    	for (DeviceEvent deviceEvent : removedDeviceEvents.values())
+    	    	    removeDevice(deviceEvent);
+    	    	for (LinkEvent linkEvent : removedLinkEvents.values())
+    	    	    removeLink(linkEvent);
+    	    	for (PortEvent portEvent : removedPortEvents.values())
+    	    	    removePort(portEvent);
+    	    	for (SwitchEvent switchEvent : removedSwitchEvents.values())
+    	    	    removeSwitch(switchEvent);
+
+    	    	//
+    	    	// Apply reordered events
+    	    	//
+    	    	applyReorderedEvents(! addedSwitchEvents.isEmpty(),
+    	    				! addedPortEvents.isEmpty());
+
+	    }
+    	    finally {
+    		//
+    		// Network Graph modifications completed: Release the lock
+    		//
+    		networkGraph.releaseWriteLock();
+	    }
+
+	    //
+	    // Dispatch the Topology Notification Events to the applications
+	    //
+	    dispatchNetworkGraphEvents();
+	}
+
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param value the value for the entry.
+	 */
+	@Override
+	public void entryAdded(TopologyEvent value) {
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      value);
+	    topologyEvents.add(eventEntry);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param value the value for the entry.
+	 */
+	@Override
+	public void entryRemoved(TopologyEvent value) {
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_REMOVE,
+					      value);
+	    topologyEvents.add(eventEntry);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param value the value for the entry.
+	 */
+	@Override
+	public void entryUpdated(TopologyEvent value) {
+	    // NOTE: The ADD and UPDATE events are processed in same way
+	    entryAdded(value);
+	}
+    }
+
+    /**
+     * Startup processing.
+     *
+     * @param datagridService the datagrid service to use.
+     */
+    void startup(IDatagridService datagridService) {
+	eventChannel = datagridService.addListener(EVENT_CHANNEL_NAME,
+						   eventHandler,
+						   byte[].class,
+						   TopologyEvent.class);
+	eventHandler.start();
+    }
+
+    /**
+     * Dispatch Network Graph Events to the listeners.
+     */
+    private void dispatchNetworkGraphEvents() {
+	if (apiAddedSwitchEvents.isEmpty() &&
+	    apiRemovedSwitchEvents.isEmpty() &&
+	    apiAddedPortEvents.isEmpty() &&
+	    apiRemovedPortEvents.isEmpty() &&
+	    apiAddedLinkEvents.isEmpty() &&
+	    apiRemovedLinkEvents.isEmpty() &&
+	    apiAddedDeviceEvents.isEmpty() &&
+	    apiRemovedDeviceEvents.isEmpty()) {
+	    return;		// No events to dispatch
+	}
+
+	if (log.isDebugEnabled()) {
+	    //
+	    // Debug statements
+	    // TODO: Those statements should be removed in the future
+	    //
+	    for (SwitchEvent switchEvent : apiAddedSwitchEvents)
+		log.debug("Dispatch Network Graph Event: ADDED {}", switchEvent);
+	    for (SwitchEvent switchEvent : apiRemovedSwitchEvents)
+		log.debug("Dispatch Network Graph Event: REMOVED {}", switchEvent);
+	    for (PortEvent portEvent : apiAddedPortEvents)
+		log.debug("Dispatch Network Graph Event: ADDED {}", portEvent);
+	    for (PortEvent portEvent : apiRemovedPortEvents)
+		log.debug("Dispatch Network Graph Event: REMOVED {}", portEvent);
+	    for (LinkEvent linkEvent : apiAddedLinkEvents)
+		log.debug("Dispatch Network Graph Event: ADDED {}", linkEvent);
+	    for (LinkEvent linkEvent : apiRemovedLinkEvents)
+		log.debug("Dispatch Network Graph Event: REMOVED {}", linkEvent);
+	    for (DeviceEvent deviceEvent : apiAddedDeviceEvents)
+		log.debug("Dispatch Network Graph Event: ADDED {}", deviceEvent);
+	    for (DeviceEvent deviceEvent : apiRemovedDeviceEvents)
+		log.debug("Dispatch Network Graph Event: REMOVED {}", deviceEvent);
+	}
+
+	// Deliver the events
+	for (INetworkGraphListener listener : this.networkGraphListeners) {
+	    // TODO: Should copy before handing them over to listener?
+	    listener.networkGraphEvents(apiAddedSwitchEvents,
+					apiRemovedSwitchEvents,
+					apiAddedPortEvents,
+					apiRemovedPortEvents,
+					apiAddedLinkEvents,
+					apiRemovedLinkEvents,
+					apiAddedDeviceEvents,
+					apiRemovedDeviceEvents);
+	}
+
+	//
+	// Cleanup
+	//
+	apiAddedSwitchEvents.clear();
+	apiRemovedSwitchEvents.clear();
+	apiAddedPortEvents.clear();
+	apiRemovedPortEvents.clear();
+	apiAddedLinkEvents.clear();
+	apiRemovedLinkEvents.clear();
+	apiAddedDeviceEvents.clear();
+	apiRemovedDeviceEvents.clear();
+    }
+
+    /**
+     * Apply reordered events.
+     *
+     * @param hasAddedSwitchEvents true if there were Added Switch Events.
+     * @param hasAddedPortEvents true if there were Added Port Events.
+     */
+    private void applyReorderedEvents(boolean hasAddedSwitchEvents,
+				      boolean hasAddedPortEvents) {
+	if (! (hasAddedSwitchEvents || hasAddedPortEvents))
+	    return;		// Nothing to do
+
+	//
+	// Try to apply the reordered events.
+	//
+	// NOTE: For simplicity we try to apply all events of a particular
+	// type if any "parent" type event was processed:
+	//  - Apply reordered Port Events if Switches were added
+	//  - Apply reordered Link and Device Events if Switches or Ports
+	//    were added
+	//
+
+	//
+	// Apply reordered Port Events if Switches were added
+	//
+	if (hasAddedSwitchEvents) {
+	    Map<ByteBuffer, PortEvent> portEvents = reorderedAddedPortEvents;
+	    reorderedAddedPortEvents = new HashMap<>();
+	    for (PortEvent portEvent : portEvents.values())
+		addPort(portEvent);
+	}
+	//
+	// Apply reordered Link and Device Events if Switches or Ports
+	// were added.
+	//
+	Map<ByteBuffer, LinkEvent> linkEvents = reorderedAddedLinkEvents;
+	reorderedAddedLinkEvents = new HashMap<>();
+	for (LinkEvent linkEvent : linkEvents.values())
+	    addLink(linkEvent);
+	//
+	Map<ByteBuffer, DeviceEvent> deviceEvents = reorderedAddedDeviceEvents;
+	reorderedAddedDeviceEvents = new HashMap<>();
+	for (DeviceEvent deviceEvent : deviceEvents.values())
+	    addDevice(deviceEvent);
+    }
+
+    /**
+     * Switch discovered event.
+     *
+     * @param switchEvent the switch event.
+     * @param portEvents the corresponding port events for the switch.
+     */
+    @Override
+    public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
+					Collection<PortEvent> portEvents) {
+	if (datastore.addSwitch(switchEvent, portEvents)) {
+	    // Send out notification
+	    TopologyEvent topologyEvent = new TopologyEvent(switchEvent);
+	    eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+	    
+	    // Send out notification for each port
+	    for (PortEvent portEvent : portEvents) {
+		topologyEvent = new TopologyEvent(portEvent);
+		eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+	    }
+
+	    //
+	    // Keep track of the added ports
+	    //
+	    // Get the old Port Events
+	    Map<ByteBuffer, PortEvent> oldPortEvents =
+		discoveredAddedPortEvents.get(switchEvent.getDpid());
+	    if (oldPortEvents == null)
+		oldPortEvents = new HashMap<>();
+
+	    // Store the new Port Events in the local cache
+	    Map<ByteBuffer, PortEvent> newPortEvents = new HashMap<>();
+	    for (PortEvent portEvent : portEvents) {
+		ByteBuffer id = portEvent.getIDasByteBuffer();
+		newPortEvents.put(id, portEvent);
+	    }
+	    discoveredAddedPortEvents.put(switchEvent.getDpid(),
+					  newPortEvents);
+
+	    //
+	    // Extract the removed ports
+	    //
+	    List<PortEvent> removedPortEvents = new LinkedList<>();
+	    for (Map.Entry<ByteBuffer, PortEvent> entry : oldPortEvents.entrySet()) {
+		ByteBuffer key = entry.getKey();
+		PortEvent portEvent = entry.getValue();
+		if (! newPortEvents.containsKey(key))
+		    removedPortEvents.add(portEvent);
+	    }
+
+	    // Cleanup old removed ports
+	    for (PortEvent portEvent : removedPortEvents)
+		removePortDiscoveryEvent(portEvent);
+	}
+    }
+
+    /**
+     * Switch removed event.
+     *
+     * @param switchEvent the switch event.
+     */
+    @Override
+    public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
+	// Get the old Port Events
+	Map<ByteBuffer, PortEvent> oldPortEvents =
+	    discoveredAddedPortEvents.get(switchEvent.getDpid());
+	if (oldPortEvents == null)
+	    oldPortEvents = new HashMap<>();
+
+	if (datastore.deactivateSwitch(switchEvent, oldPortEvents.values())) {
+	    // Send out notification
+	    eventChannel.removeEntry(switchEvent.getID());
+
+	    //
+	    // Send out notification for each port.
+	    //
+	    // NOTE: We don't use removePortDiscoveryEvent() for the cleanup,
+	    // because it will attempt to remove the port from the database,
+	    // and the deactiveSwitch() call above already removed all ports.
+	    //
+	    for (PortEvent portEvent : oldPortEvents.values())
+		eventChannel.removeEntry(portEvent.getID());
+	    discoveredAddedPortEvents.remove(switchEvent.getDpid());
+
+	    // Cleanup for each link
+	    Map<ByteBuffer, LinkEvent> oldLinkEvents =
+		discoveredAddedLinkEvents.get(switchEvent.getDpid());
+	    if (oldLinkEvents != null) {
+		for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+		    removeLinkDiscoveryEvent(linkEvent);
+		}
+		discoveredAddedLinkEvents.remove(switchEvent.getDpid());
+	    }
+
+	    // Cleanup for each device
+	    Map<ByteBuffer, DeviceEvent> oldDeviceEvents =
+		discoveredAddedDeviceEvents.get(switchEvent.getDpid());
+	    if (oldDeviceEvents != null) {
+		for (DeviceEvent deviceEvent : new ArrayList<>(oldDeviceEvents.values())) {
+		    removeDeviceDiscoveryEvent(deviceEvent);
+		}
+		discoveredAddedDeviceEvents.remove(switchEvent.getDpid());
+	    }
+	}
+    }
+
+    /**
+     * Port discovered event.
+     *
+     * @param portEvent the port event.
+     */
+    @Override
+    public void putPortDiscoveryEvent(PortEvent portEvent) {
+	if (datastore.addPort(portEvent)) {
+	    // Send out notification
+	    TopologyEvent topologyEvent = new TopologyEvent(portEvent);
+	    eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+	    // Store the new Port Event in the local cache
+	    Map<ByteBuffer, PortEvent> oldPortEvents =
+		discoveredAddedPortEvents.get(portEvent.getDpid());
+	    if (oldPortEvents == null) {
+		oldPortEvents = new HashMap<>();
+		discoveredAddedPortEvents.put(portEvent.getDpid(),
+					      oldPortEvents);
+	    }
+	    ByteBuffer id = portEvent.getIDasByteBuffer();
+	    oldPortEvents.put(id, portEvent);
+	}
+    }
+
+    /**
+     * Port removed event.
+     *
+     * @param portEvent the port event.
+     */
+    @Override
+    public void removePortDiscoveryEvent(PortEvent portEvent) {
+	if (datastore.deactivatePort(portEvent)) {
+	    // Send out notification
+	    eventChannel.removeEntry(portEvent.getID());
+
+	    // Cleanup the Port Event from the local cache
+	    Map<ByteBuffer, PortEvent> oldPortEvents =
+		discoveredAddedPortEvents.get(portEvent.getDpid());
+	    if (oldPortEvents != null) {
+		ByteBuffer id = portEvent.getIDasByteBuffer();
+		oldPortEvents.remove(id);
+	    }
+
+	    // Cleanup for the incoming link
+	    Map<ByteBuffer, LinkEvent> oldLinkEvents =
+		discoveredAddedLinkEvents.get(portEvent.getDpid());
+	    if (oldLinkEvents != null) {
+		for (LinkEvent linkEvent : new ArrayList<>(oldLinkEvents.values())) {
+		    if (linkEvent.getDst().equals(portEvent.id)) {
+			removeLinkDiscoveryEvent(linkEvent);
+			// XXX If we change our model to allow multiple Link on
+			// a Port, this loop must be fixed to allow continuing.
+			break;
+		    }
+		}
+	    }
+
+	    // Cleanup for the connected devices
+	    // TODO: The implementation below is probably wrong
+	    List<DeviceEvent> removedDeviceEvents = new LinkedList<>();
+	    Map<ByteBuffer, DeviceEvent> oldDeviceEvents =
+		discoveredAddedDeviceEvents.get(portEvent.getDpid());
+	    if (oldDeviceEvents != null) {
+		for (DeviceEvent deviceEvent : new ArrayList<>(oldDeviceEvents.values())) {
+		    for (SwitchPort swp : deviceEvent.getAttachmentPoints()) {
+			if (swp.equals(portEvent.id)) {
+			    removedDeviceEvents.add(deviceEvent);
+			}
+		    }
+		}
+		for (DeviceEvent deviceEvent : removedDeviceEvents)
+		    removeDeviceDiscoveryEvent(deviceEvent);
+	    }
+	}
+    }
+
+    /**
+     * Link discovered event.
+     *
+     * @param linkEvent the link event.
+     */
+    @Override
+    public void putLinkDiscoveryEvent(LinkEvent linkEvent) {
+	if (datastore.addLink(linkEvent)) {
+	    // Send out notification
+	    TopologyEvent topologyEvent = new TopologyEvent(linkEvent);
+	    eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+	    // Store the new Link Event in the local cache
+	    Map<ByteBuffer, LinkEvent> oldLinkEvents =
+		discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
+	    if (oldLinkEvents == null) {
+		oldLinkEvents = new HashMap<>();
+		discoveredAddedLinkEvents.put(linkEvent.getDst().getDpid(),
+					      oldLinkEvents);
+	    }
+	    ByteBuffer id = linkEvent.getIDasByteBuffer();
+	    oldLinkEvents.put(id, linkEvent);
+	}
+    }
+
+    /**
+     * Link removed event.
+     *
+     * @param linkEvent the link event.
+     */
+    @Override
+    public void removeLinkDiscoveryEvent(LinkEvent linkEvent) {
+	if (datastore.removeLink(linkEvent)) {
+	    // Send out notification
+	    eventChannel.removeEntry(linkEvent.getID());
+
+	    // Cleanup the Link Event from the local cache
+	    Map<ByteBuffer, LinkEvent> oldLinkEvents =
+		discoveredAddedLinkEvents.get(linkEvent.getDst().getDpid());
+	    if (oldLinkEvents != null) {
+		ByteBuffer id = linkEvent.getIDasByteBuffer();
+		oldLinkEvents.remove(id);
+	    }
+	}
+    }
+
+    /**
+     * Device discovered event.
+     *
+     * @param deviceEvent the device event.
+     */
+    @Override
+    public void putDeviceDiscoveryEvent(DeviceEvent deviceEvent) {
+	if (datastore.addDevice(deviceEvent)) {
+	    // Send out notification
+	    TopologyEvent topologyEvent = new TopologyEvent(deviceEvent);
+	    eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+	    log.debug("Put the device info into the cache of the graph. mac {}", deviceEvent.getMac());
+	    
+	    // Store the new Device Event in the local cache
+	    // TODO: The implementation below is probably wrong
+	    for (SwitchPort swp : deviceEvent.getAttachmentPoints()) {
+		Map<ByteBuffer, DeviceEvent> oldDeviceEvents =
+		    discoveredAddedDeviceEvents.get(swp.getDpid());
+		if (oldDeviceEvents == null) {
+		    oldDeviceEvents = new HashMap<>();
+		    discoveredAddedDeviceEvents.put(swp.getDpid(),
+						    oldDeviceEvents);
+		}
+		ByteBuffer id = deviceEvent.getIDasByteBuffer();
+		oldDeviceEvents.put(id, deviceEvent);
+	    }
+	}
+    }
+
+    /**
+     * Device removed event.
+     *
+     * @param deviceEvent the device event.
+     */
+    @Override
+    public void removeDeviceDiscoveryEvent(DeviceEvent deviceEvent) {
+	if (datastore.removeDevice(deviceEvent)) {
+	    // Send out notification
+	    eventChannel.removeEntry(deviceEvent.getID());
+	    log.debug("Remove the device info into the cache of the graph. mac {}", deviceEvent.getMac());
+
+	    // Cleanup the Device Event from the local cache
+	    // TODO: The implementation below is probably wrong
+	    ByteBuffer id = ByteBuffer.wrap(deviceEvent.getID());
+	    for (SwitchPort swp : deviceEvent.getAttachmentPoints()) {
+		Map<ByteBuffer, DeviceEvent> oldDeviceEvents =
+		    discoveredAddedDeviceEvents.get(swp.getDpid());
+		if (oldDeviceEvents != null) {
+		    oldDeviceEvents.remove(id);
+		}
+	    }
+	}
+    }
+
+    /**
+     * Add a switch to the Network Graph.
+     *
+     * @param switchEvent the Switch Event with the switch to add.
+     */
+    private void addSwitch(SwitchEvent switchEvent) {
+	Switch sw = networkGraph.getSwitch(switchEvent.getDpid());
+	if (sw == null) {
+	    sw = new SwitchImpl(networkGraph, switchEvent.getDpid());
+	    networkGraph.putSwitch(sw);
+	} else {
+	    // TODO: Update the switch attributes
+	    // TODO: Nothing to do for now
+	}
+	apiAddedSwitchEvents.add(switchEvent);
+    }
+
+    /**
+     * Remove a switch from the Network Graph.
+     *
+     * @param switchEvent the Switch Event with the switch to remove.
+     */
+    private void removeSwitch(SwitchEvent switchEvent) {
+	Switch sw = networkGraph.getSwitch(switchEvent.getDpid());
+	if (sw == null) {
+	    log.warn("Switch {} already removed, ignoring", switchEvent);
+	    return;
+	}
+
+	//
+	// Remove all Ports on the Switch
+	//
+	ArrayList<PortEvent> portsToRemove = new ArrayList<>();
+	for (Port port : sw.getPorts()) {
+	    log.warn("Port {} on Switch {} should be removed prior to removing Switch. Removing Port now.",
+		     port, switchEvent);
+	    PortEvent portEvent = new PortEvent(port.getDpid(),
+						port.getNumber());
+	    portsToRemove.add(portEvent);
+	}
+	for (PortEvent portEvent : portsToRemove)
+	    removePort(portEvent);
+
+	networkGraph.removeSwitch(switchEvent.getDpid());
+	apiRemovedSwitchEvents.add(switchEvent);
+    }
+
+    /**
+     * Add a port to the Network Graph.
+     *
+     * @param portEvent the Port Event with the port to add.
+     */
+    private void addPort(PortEvent portEvent) {
+	Switch sw = networkGraph.getSwitch(portEvent.getDpid());
+	if (sw == null) {
+	    // Reordered event: delay the event in local cache
+	    ByteBuffer id = portEvent.getIDasByteBuffer();
+	    reorderedAddedPortEvents.put(id, portEvent);
+	    return;
+	}
+	SwitchImpl switchImpl = getSwitchImpl(sw);
+
+	Port port = sw.getPort(portEvent.getNumber());
+	if (port == null) {
+	    port = new PortImpl(networkGraph, sw, portEvent.getNumber());
+	    switchImpl.addPort(port);
+	} else {
+	    // TODO: Update the port attributes
+	}
+	apiAddedPortEvents.add(portEvent);
+    }
+
+    /**
+     * Remove a port from the Network Graph.
+     *
+     * @param portEvent the Port Event with the port to remove.
+     */
+    private void removePort(PortEvent portEvent) {
+	Switch sw = networkGraph.getSwitch(portEvent.getDpid());
+	if (sw == null) {
+	    log.warn("Parent Switch for Port {} already removed, ignoring",
+		     portEvent);
+	    return;
+	}
+
+	Port port = sw.getPort(portEvent.getNumber());
+	if (port == null) {
+	    log.warn("Port {} already removed, ignoring", portEvent);
+	    return;
+	}
+
+	//
+	// Remove all Devices attached to the Port
+	//
+	ArrayList<DeviceEvent> devicesToRemove = new ArrayList<>();
+	for (Device device : port.getDevices()) {
+	    log.debug("Removing Device {} on Port {}", device, portEvent);
+	    DeviceEvent deviceEvent = new DeviceEvent(device.getMacAddress());
+	    SwitchPort switchPort = new SwitchPort(port.getSwitch().getDpid(),
+						   port.getNumber());
+	    deviceEvent.addAttachmentPoint(switchPort);
+	    devicesToRemove.add(deviceEvent);
+	}
+	for (DeviceEvent deviceEvent : devicesToRemove)
+	    removeDevice(deviceEvent);
+
+	//
+	// Remove all Links connected to the Port
+	//
+	Set<Link> links = new HashSet<>();
+	links.add(port.getOutgoingLink());
+	links.add(port.getIncomingLink());
+	ArrayList<LinkEvent> linksToRemove = new ArrayList<>();
+	for (Link link : links) {
+	    if (link == null)
+		continue;
+	    log.debug("Removing Link {} on Port {}", link, portEvent);
+	    LinkEvent linkEvent = new LinkEvent(link.getSrcSwitch().getDpid(),
+						link.getSrcPort().getNumber(),
+						link.getDstSwitch().getDpid(),
+						link.getDstPort().getNumber());
+	    linksToRemove.add(linkEvent);
+	}
+	for (LinkEvent linkEvent : linksToRemove)
+	    removeLink(linkEvent);
+
+	// Remove the Port from the Switch
+	SwitchImpl switchImpl = getSwitchImpl(sw);
+	switchImpl.removePort(port);
+
+	apiRemovedPortEvents.add(portEvent);
+    }
+
+    /**
+     * Add a link to the Network Graph.
+     *
+     * @param linkEvent the Link Event with the link to add.
+     */
+    private void addLink(LinkEvent linkEvent) {
+	Port srcPort = networkGraph.getPort(linkEvent.getSrc().dpid,
+					    linkEvent.getSrc().number);
+	Port dstPort = networkGraph.getPort(linkEvent.getDst().dpid,
+					    linkEvent.getDst().number);
+	if ((srcPort == null) || (dstPort == null)) {
+	    // Reordered event: delay the event in local cache
+	    ByteBuffer id = linkEvent.getIDasByteBuffer();
+	    reorderedAddedLinkEvents.put(id, linkEvent);
+	    return;
+	}
+
+	// Get the Link instance from the Destination Port Incoming Link
+	Link link = dstPort.getIncomingLink();
+	assert(link == srcPort.getOutgoingLink());
+	if (link == null) {
+	    link = new LinkImpl(networkGraph, srcPort, dstPort);
+	    PortImpl srcPortImpl = getPortImpl(srcPort);
+	    PortImpl dstPortImpl = getPortImpl(dstPort);
+	    srcPortImpl.setOutgoingLink(link);
+	    dstPortImpl.setIncomingLink(link);
+
+	    // Remove all Devices attached to the Ports
+	    ArrayList<DeviceEvent> devicesToRemove = new ArrayList<>();
+	    ArrayList<Port> ports = new ArrayList<>();
+	    ports.add(srcPort);
+	    ports.add(dstPort);
+	    for (Port port : ports) {
+		for (Device device : port.getDevices()) {
+		    log.error("Device {} on Port {} should have been removed prior to adding Link {}",
+			      device, port, linkEvent);
+		    DeviceEvent deviceEvent =
+			new DeviceEvent(device.getMacAddress());
+		    SwitchPort switchPort =
+			new SwitchPort(port.getSwitch().getDpid(),
+				       port.getNumber());
+		    deviceEvent.addAttachmentPoint(switchPort);
+		    devicesToRemove.add(deviceEvent);
+		}
+	    }
+	    for (DeviceEvent deviceEvent : devicesToRemove)
+		removeDevice(deviceEvent);
+	} else {
+	    // TODO: Update the link attributes
+	}
+
+	apiAddedLinkEvents.add(linkEvent);
+    }
+
+    /**
+     * Remove a link from the Network Graph.
+     *
+     * @param linkEvent the Link Event with the link to remove.
+     */
+    private void removeLink(LinkEvent linkEvent) {
+	Port srcPort = networkGraph.getPort(linkEvent.getSrc().dpid,
+					    linkEvent.getSrc().number);
+	if (srcPort == null) {
+	    log.warn("Src Port for Link {} already removed, ignoring",
+		     linkEvent);
+	    return;
+	}
+
+	Port dstPort = networkGraph.getPort(linkEvent.getDst().dpid,
+					    linkEvent.getDst().number);
+	if (dstPort == null) {
+	    log.warn("Dst Port for Link {} already removed, ignoring",
+		     linkEvent);
+	    return;
+	}
+
+	//
+	// Remove the Link instance from the Destination Port Incoming Link
+	// and the Source Port Outgoing Link.
+	//
+	Link link = dstPort.getIncomingLink();
+	if (link == null) {
+	    log.warn("Link {} already removed on destination Port", linkEvent);
+	}
+	link = srcPort.getOutgoingLink();
+	if (link == null) {
+	    log.warn("Link {} already removed on src Port", linkEvent);
+	}
+	getPortImpl(dstPort).setIncomingLink(null);
+	getPortImpl(srcPort).setOutgoingLink(null);
+
+	apiRemovedLinkEvents.add(linkEvent);
+    }
+
+    /**
+     * Add a device to the Network Graph.
+     *
+     * TODO: Device-related work is incomplete.
+     * TODO: Eventually, we might need to consider reordering
+     * or addLink() and addDevice() events on the same port.
+     *
+     * @param deviceEvent the Device Event with the device to add.
+     */
+    private void addDevice(DeviceEvent deviceEvent) {
+	Device device = networkGraph.getDeviceByMac(deviceEvent.getMac());
+	
+	if (device == null) {
+		log.debug("Existing device was not found in networkGraph. New device. mac {}", deviceEvent.getMac());
+	    device = new DeviceImpl(networkGraph, deviceEvent.getMac());
+	}
+	
+	DeviceImpl deviceImpl = getDeviceImpl(device);
+
+	// Update the IP addresses
+	for (InetAddress ipAddr : deviceEvent.getIpAddresses())
+	    deviceImpl.addIpAddress(ipAddr);
+
+	// Process each attachment point
+	boolean attachmentFound = false;
+	for (SwitchPort swp : deviceEvent.getAttachmentPoints()) {
+	    // Attached Ports must exist
+	    Port port = networkGraph.getPort(swp.dpid, swp.number);
+	    if (port == null) {
+		// Reordered event: delay the event in local cache
+		ByteBuffer id = deviceEvent.getIDasByteBuffer();
+		reorderedAddedDeviceEvents.put(id, deviceEvent);
+		continue;
+	    }
+	    // Attached Ports must not have Link
+	    if (port.getOutgoingLink() != null ||
+		port.getIncomingLink() != null) {
+		log.warn("Link (Out:{},In:{}) exist on the attachment point, skipping mutation.",
+			 port.getOutgoingLink(),
+			 port.getIncomingLink());
+		continue;
+	    }
+
+	    // Add Device <-> Port attachment
+	    PortImpl portImpl = getPortImpl(port);
+	    portImpl.addDevice(device);
+	    deviceImpl.addAttachmentPoint(port);
+	    attachmentFound = true;
+	}
+
+	// Update the device in the Network Graph
+	if (attachmentFound) {
+    	log.debug("Storing the info into networkGraph. mac {}", deviceEvent.getMac());
+	    networkGraph.putDevice(device);
+	    apiAddedDeviceEvents.add(deviceEvent);
+	}
+    }
+
+    /**
+     * Remove a device from the Network Graph.
+     *
+     * TODO: Device-related work is incomplete.
+     *
+     * @param deviceEvent the Device Event with the device to remove.
+     */
+    private void removeDevice(DeviceEvent deviceEvent) {
+	Device device = networkGraph.getDeviceByMac(deviceEvent.getMac());
+	if (device == null) {
+	    log.warn("Device {} already removed, ignoring", deviceEvent);
+	    return;
+	}
+	DeviceImpl deviceImpl = getDeviceImpl(device);
+
+	// Process each attachment point
+	for (SwitchPort swp : deviceEvent.getAttachmentPoints()) {
+	    // Attached Ports must exist
+	    Port port = networkGraph.getPort(swp.dpid, swp.number);
+	    if (port == null) {
+		log.warn("Port for the attachment point {} did not exist. skipping attachment point mutation", swp);
+		continue;
+	    }
+
+	    // Remove Device <-> Port attachment
+	    PortImpl portImpl = getPortImpl(port);
+	    portImpl.removeDevice(device);
+	    deviceImpl.removeAttachmentPoint(port);
+	}
+
+	networkGraph.removeDevice(device);
+	apiRemovedDeviceEvents.add(deviceEvent);
+    }
+
+    /**
+     * Get the SwitchImpl-casted switch implementation.
+     *
+     * @param sw the Switch to cast.
+     * @return the SwitchImpl-casted switch implementation.
+     */
+    private SwitchImpl getSwitchImpl(Switch sw) {
+	if (sw instanceof SwitchImpl) {
+	    return (SwitchImpl)sw;
+	}
+	throw new ClassCastException("SwitchImpl expected, but found: " + sw);
+    }
+
+    /**
+     * Get the PortImpl-casted port implementation.
+     *
+     * @param port the Port to cast.
+     * @return the PortImpl-casted port implementation.
+     */
+    private PortImpl getPortImpl(Port port) {
+	if (port instanceof PortImpl) {
+	    return (PortImpl)port;
+	}
+	throw new ClassCastException("PortImpl expected, but found: " + port);
+    }
+
+    /**
+     * Get the LinkImpl-casted link implementation.
+     *
+     * @param link the Link to cast.
+     * @return the LinkImpl-casted link implementation.
+     */
+    private LinkImpl getLinkImpl(Link link) {
+	if (link instanceof LinkImpl) {
+	    return (LinkImpl)link;
+	}
+	throw new ClassCastException("LinkImpl expected, but found: " + link);
+    }
+
+    /**
+     * Get the DeviceImpl-casted device implementation.
+     *
+     * @param device the Device to cast.
+     * @return the DeviceImpl-casted device implementation.
+     */
+    private DeviceImpl getDeviceImpl(Device device) {
+	if (device instanceof DeviceImpl) {
+	    return (DeviceImpl)device;
+	}
+	throw new ClassCastException("DeviceImpl expected, but found: " + device);
+    }
+
+    /**
+     * Read the whole topology from the database.
+     *
+     * @return a collection of EventEntry-encapsulated Topology Events for
+     * the whole topology.
+     */
+    private Collection<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
+	Collection<EventEntry<TopologyEvent>> collection =
+	    new LinkedList<EventEntry<TopologyEvent>>();
+
+	// XXX May need to clear whole topology first, depending on
+	// how we initially subscribe to replication events
+
+	// Add all active switches
+	for (KVSwitch sw : KVSwitch.getAllSwitches()) {
+	    if (sw.getStatus() != KVSwitch.STATUS.ACTIVE) {
+		continue;
+	    }
+
+	    SwitchEvent switchEvent = new SwitchEvent(sw.getDpid());
+	    TopologyEvent topologyEvent = new TopologyEvent(switchEvent);
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      topologyEvent);
+	    collection.add(eventEntry);
+	}
+
+	// Add all active ports
+	for (KVPort p : KVPort.getAllPorts()) {
+	    if (p.getStatus() != KVPort.STATUS.ACTIVE) {
+		continue;
+	    }
+
+	    PortEvent portEvent = new PortEvent(p.getDpid(), p.getNumber());
+	    TopologyEvent topologyEvent = new TopologyEvent(portEvent);
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      topologyEvent);
+	    collection.add(eventEntry);
+	}
+
+	// TODO Is Device going to be in DB? If so, read from DB.
+	//	for (KVDevice d : KVDevice.getAllDevices()) {
+	//	    DeviceEvent devEvent = new DeviceEvent( MACAddress.valueOf(d.getMac()) );
+	//	    for (byte[] portId : d.getAllPortIds() ) {
+	//		devEvent.addAttachmentPoint( new SwitchPort( KVPort.getDpidFromKey(portId), KVPort.getNumberFromKey(portId) ));
+	//	    }
+	//	}
+
+	for (KVLink l : KVLink.getAllLinks()) {
+	    LinkEvent linkEvent = new LinkEvent(l.getSrc().dpid,
+						l.getSrc().number,
+						l.getDst().dpid,
+						l.getDst().number);
+	    TopologyEvent topologyEvent = new TopologyEvent(linkEvent);
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      topologyEvent);
+	    collection.add(eventEntry);
+	}
+
+	return collection;
+    }
+}