Network Graph Refactoring: WIP: extract the events by canceling matching events

 * Extract the events by canceling matching events
 * Don't embed the PortEvent inside the SwitchEvent, because with such
   with such embedding we cannot handle events reordering. E.g.,
   If ADD(SwitchEvent) with embedded ADD(PortEvent) is followed by
   REMOVE(PortEvent), then reordering of those two events is problematic
   and cannot be detected easily.

 * Adjust some of the API to reflect the removal of the embedded
  PortEvent events within the SwitchEvent.

 * Renamed TopologyManage.loadWholeTopologyFromDB() to
   readWholeTopologyFromDB() and change its implementation to
   return a collection of Topology Events.
   Not used for now.

Change-Id: I456514e1ef997c5b50684448193fe47d7a46b141
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
index f620614..ebe9900 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
@@ -1,11 +1,14 @@
 package net.onrc.onos.ofcontroller.networkgraph;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -53,6 +56,24 @@
     private final IControllerRegistryService registryService;
     private CopyOnWriteArrayList<INetworkGraphListener> networkGraphListeners;
 
+    // Local state for computing the final set of events
+    private Map<ByteBuffer, SwitchEvent> addedSwitchEvents =
+	new HashMap<ByteBuffer, SwitchEvent>();
+    private Map<ByteBuffer, SwitchEvent> removedSwitchEvents =
+	new HashMap<ByteBuffer, SwitchEvent>();
+    private Map<ByteBuffer, PortEvent> addedPortEvents =
+	new HashMap<ByteBuffer, PortEvent>();
+    private Map<ByteBuffer, PortEvent> removedPortEvents =
+	new HashMap<ByteBuffer, PortEvent>();
+    private Map<ByteBuffer, LinkEvent> addedLinkEvents =
+	new HashMap<ByteBuffer, LinkEvent>();
+    private Map<ByteBuffer, LinkEvent> removedLinkEvents =
+	new HashMap<ByteBuffer, LinkEvent>();
+    private Map<ByteBuffer, DeviceEvent> addedDeviceEvents =
+	new HashMap<ByteBuffer, DeviceEvent>();
+    private Map<ByteBuffer, DeviceEvent> removedDeviceEvents =
+	new HashMap<ByteBuffer, DeviceEvent>();
+
     /**
      * Constructor.
      *
@@ -89,7 +110,11 @@
 	 */
 	private void startup() {
 	    //
-	    // TODO: Read all state from the database
+	    // TODO: Read all state from the database:
+	    //
+	    // Collection<EventEntry<TopologyEvent>> collection =
+	    //    readWholeTopologyFromDB();
+	    //
 	    // For now, as a shortcut we read it from the datagrid
 	    //
 	    Collection<TopologyEvent> topologyEvents =
@@ -142,28 +167,60 @@
 	private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
 	    for (EventEntry<TopologyEvent> event : events) {
 		TopologyEvent topologyEvent = event.eventData();
+		SwitchEvent switchEvent = topologyEvent.switchEvent;
+		PortEvent portEvent = topologyEvent.portEvent;
+		LinkEvent linkEvent = topologyEvent.linkEvent;
+		DeviceEvent deviceEvent = topologyEvent.deviceEvent;
+
+		//
+		// Extract the events by cancelling matching events
+		//
 		switch (event.eventType()) {
 		case ENTRY_ADD:
 		    log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
-		    if (topologyEvent.switchEvent != null)
-			putSwitchReplicationEvent(topologyEvent.switchEvent);
-		    if (topologyEvent.portEvent != null)
-			putPortReplicationEvent(topologyEvent.portEvent);
-		    if (topologyEvent.linkEvent != null)
-			putLinkReplicationEvent(topologyEvent.linkEvent);
-		    if (topologyEvent.deviceEvent != null)
-			putDeviceReplicationEvent(topologyEvent.deviceEvent);
+		    if (switchEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(switchEvent.getID());
+			addedSwitchEvents.put(id, switchEvent);
+			removedSwitchEvents.remove(id);
+		    }
+		    if (portEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(portEvent.getID());
+			addedPortEvents.put(id, portEvent);
+			removedPortEvents.remove(id);
+		    }
+		    if (linkEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(linkEvent.getID());
+			addedLinkEvents.put(id, linkEvent);
+			removedLinkEvents.remove(id);
+		    }
+		    if (deviceEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(deviceEvent.getID());
+			addedDeviceEvents.put(id, deviceEvent);
+			removedDeviceEvents.remove(id);
+		    }
 		    break;
 		case ENTRY_REMOVE:
 		    log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
-		    if (topologyEvent.switchEvent != null)
-			removeSwitchReplicationEvent(topologyEvent.switchEvent);
-		    if (topologyEvent.portEvent != null)
-			removePortReplicationEvent(topologyEvent.portEvent);
-		    if (topologyEvent.linkEvent != null)
-			removeLinkReplicationEvent(topologyEvent.linkEvent);
-		    if (topologyEvent.deviceEvent != null)
-			removeDeviceReplicationEvent(topologyEvent.deviceEvent);
+		    if (switchEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(switchEvent.getID());
+			addedSwitchEvents.remove(id);
+			removedSwitchEvents.put(id, switchEvent);
+		    }
+		    if (portEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(portEvent.getID());
+			addedPortEvents.remove(id);
+			removedPortEvents.put(id, portEvent);
+		    }
+		    if (linkEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(linkEvent.getID());
+			addedLinkEvents.remove(id);
+			removedLinkEvents.put(id, linkEvent);
+		    }
+		    if (deviceEvent != null) {
+			ByteBuffer id = ByteBuffer.wrap(deviceEvent.getID());
+			addedDeviceEvents.remove(id);
+			removedDeviceEvents.put(id, deviceEvent);
+		    }
 		    break;
 		}
 	    }
@@ -243,19 +300,32 @@
      * ******************************/
 
     @Override
-    public void putSwitchDiscoveryEvent(SwitchEvent switchEvent) {
-	if (datastore.addSwitch(switchEvent)) {
+    public void putSwitchDiscoveryEvent(SwitchEvent switchEvent,
+					Collection<PortEvent> portEvents) {
+	if (datastore.addSwitch(switchEvent, portEvents)) {
 	    // Send out notification
 	    TopologyEvent topologyEvent = new TopologyEvent(switchEvent);
 	    eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+
+	    for (PortEvent portEvent : portEvents) {
+		topologyEvent = new TopologyEvent(portEvent);
+		eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+	    }
 	}
     }
 
     @Override
     public void removeSwitchDiscoveryEvent(SwitchEvent switchEvent) {
-	if (datastore.deactivateSwitch(switchEvent)) {
+	// TODO: Use a copy of the port events previously added for that switch
+	Collection<PortEvent> portEvents = new LinkedList<PortEvent>();
+
+	if (datastore.deactivateSwitch(switchEvent, portEvents)) {
 	    // Send out notification
 	    eventChannel.removeEntry(switchEvent.getID());
+
+	    for (PortEvent portEvent : portEvents) {
+		eventChannel.removeEntry(portEvent.getID());
+	    }
 	}
     }
 
@@ -264,6 +334,7 @@
 	if (datastore.addPort(portEvent)) {
 	    // Send out notification
 	    TopologyEvent topologyEvent = new TopologyEvent(portEvent);
+	    eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
 	}
     }
 
@@ -309,82 +380,6 @@
 	}
     }
 
-    /* ************************************************
-     * Internal methods for processing the replication events
-     * ************************************************/
-
-    private void putSwitchReplicationEvent(SwitchEvent switchEvent) {
-	if (prepareForAddSwitchEvent(switchEvent)) {
-	    putSwitch(switchEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchPutSwitchEvent(switchEvent);
-    }
-
-    private void removeSwitchReplicationEvent(SwitchEvent switchEvent) {
-	if (prepareForRemoveSwitchEvent(switchEvent)) {
-	    removeSwitch(switchEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchRemoveSwitchEvent(switchEvent);
-    }
-
-    private void putPortReplicationEvent(PortEvent portEvent) {
-	if (prepareForAddPortEvent(portEvent)) {
-	    putPort(portEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchPutPortEvent(portEvent);
-    }
-
-    private void removePortReplicationEvent(PortEvent portEvent) {
-	if (prepareForRemovePortEvent(portEvent)) {
-	    removePort(portEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchRemovePortEvent(portEvent);
-    }
-
-    private void putLinkReplicationEvent(LinkEvent linkEvent) {
-	if (prepareForAddLinkEvent(linkEvent)) {
-	    putLink(linkEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchPutLinkEvent(linkEvent);
-    }
-
-    private void removeLinkReplicationEvent(LinkEvent linkEvent) {
-	if (prepareForRemoveLinkEvent(linkEvent)) {
-	    removeLink(linkEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchRemoveLinkEvent(linkEvent);
-    }
-
-    private void putDeviceReplicationEvent(DeviceEvent deviceEvent) {
-	if (prepareForAddDeviceEvent(deviceEvent)) {
-	    putDevice(deviceEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchPutDeviceEvent(deviceEvent);
-    }
-
-    private void removeDeviceReplicationEvent(DeviceEvent deviceEvent) {
-	if (prepareForRemoveDeviceEvent(deviceEvent)) {
-	    removeDevice(deviceEvent);
-	}
-	// TODO handle invariant violation
-	// trigger instance local topology event handler
-	dispatchRemoveDeviceEvent(deviceEvent);
-    }
-
     /* *****************
      * Internal methods to maintain invariants of the network graph
      * *****************/
@@ -396,50 +391,14 @@
      */
     private boolean prepareForAddSwitchEvent(SwitchEvent swEvent) {
 	// No show stopping precondition
-	// Prep: remove(deactivate) Ports on Switch, which is not on event
-	removePortsNotOnEvent(swEvent);
 	return true;
     }
 
     private boolean prepareForRemoveSwitchEvent(SwitchEvent swEvent) {
 	// No show stopping precondition
-	// Prep: remove(deactivate) Ports on Switch, which is not on event
-	// XXX may be remove switch should imply wipe all ports
-	removePortsNotOnEvent(swEvent);
 	return true;
     }
 
-    private void removePortsNotOnEvent(SwitchEvent swEvent) {
-	Switch sw = networkGraph.getSwitch(swEvent.getDpid());
-	if (sw != null) {
-	    Set<Long> port_noOnEvent = new HashSet<>();
-	    for (PortEvent portEvent : swEvent.getPorts()) {
-		port_noOnEvent.add(portEvent.getNumber());
-	    }
-	    // Existing ports not on event should be removed.
-	    // TODO Should batch eventually for performance?
-	    List<Port> portsToRemove = new ArrayList<Port>();
-	    for (Port p : sw.getPorts()) {
-		if (!port_noOnEvent.contains(p.getNumber())) {
-		    //PortEvent rmEvent = new PortEvent(p.getSwitch().getDpid(), p.getNumber());
-		    // calling Discovery removePort() API to wipe from DB, etc.
-		    //removePortDiscoveryEvent(rmEvent);
-
-		    // We can't remove ports here because this will trigger a remove
-		    // from the switch's port list, which we are currently iterating
-		    // over.
-		    portsToRemove.add(p);
-		}
-	    }
-	    for (Port p : portsToRemove) {
-		PortEvent rmEvent = new PortEvent(p.getSwitch().getDpid(),
-						  p.getNumber());
-		// calling Discovery removePort() API to wipe from DB, etc.
-		removePortDiscoveryEvent(rmEvent);
-	    }
-	}
-    }
-
     private boolean prepareForAddPortEvent(PortEvent portEvent) {
 	// Parent Switch must exist
 	if (networkGraph.getSwitch(portEvent.getDpid()) == null) {
@@ -627,11 +586,6 @@
 
 	// Update when more attributes are added to Event object
 	// no attribute to update for now
-
-	// TODO handle child Port event properly for performance
-	for (PortEvent portEvent : swEvent.getPorts() ) {
-	    putPort(portEvent);
-	}
     }
 
     void removeSwitch(SwitchEvent swEvent) {
@@ -639,11 +593,6 @@
 	    throw new IllegalArgumentException("Switch cannot be null");
 	}
 
-	// TODO handle child Port event properly for performance
-	for (PortEvent portEvent : swEvent.getPorts() ) {
-	    removePort(portEvent);
-	}
-
 	Switch sw = networkGraph.getSwitch(swEvent.getDpid());
 	if (sw == null) {
 	    log.warn("Switch {} already removed, ignoring", swEvent);
@@ -1027,22 +976,39 @@
     }
 
     @Deprecated
-    public void loadWholeTopologyFromDB() {
+    private Collection<EventEntry<TopologyEvent>> readWholeTopologyFromDB() {
+	Collection<EventEntry<TopologyEvent>> collection =
+	    new LinkedList<EventEntry<TopologyEvent>>();
+
 	// XXX May need to clear whole topology first, depending on
 	// how we initially subscribe to replication events
 
+	// Add all active switches
 	for (RCSwitch sw : RCSwitch.getAllSwitches()) {
 	    if (sw.getStatus() != RCSwitch.STATUS.ACTIVE) {
 		continue;
 	    }
-	    putSwitchReplicationEvent(new SwitchEvent(sw.getDpid()));
+
+	    SwitchEvent switchEvent = new SwitchEvent(sw.getDpid());
+	    TopologyEvent topologyEvent = new TopologyEvent(switchEvent);
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      topologyEvent);
+	    collection.add(eventEntry);
 	}
 
+	// Add all active ports
 	for (RCPort p : RCPort.getAllPorts()) {
 	    if (p.getStatus() != RCPort.STATUS.ACTIVE) {
 		continue;
 	    }
-	    putPortReplicationEvent(new PortEvent(p.getDpid(), p.getNumber()));
+
+	    PortEvent portEvent = new PortEvent(p.getDpid(), p.getNumber());
+	    TopologyEvent topologyEvent = new TopologyEvent(portEvent);
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      topologyEvent);
+	    collection.add(eventEntry);
 	}
 
 	// TODO Is Device going to be in DB? If so, read from DB.
@@ -1062,11 +1028,19 @@
 	    if (srcPort == null || dstPort == null) {
 		continue;
 	    }
-	    putLinkReplicationEvent(new LinkEvent(l.getSrc().dpid,
-						  l.getSrc().number,
-						  l.getDst().dpid,
-						  l.getDst().number));
+
+	    LinkEvent linkEvent = new LinkEvent(l.getSrc().dpid,
+						l.getSrc().number,
+						l.getDst().dpid,
+						l.getDst().number);
+	    TopologyEvent topologyEvent = new TopologyEvent(linkEvent);
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      topologyEvent);
+	    collection.add(eventEntry);
 	}
+
+	return collection;
     }
 
     @Deprecated