Integrate the new notification framework with the new NetworkGraph
implementation.

Change-Id: I93033a5747c216dd336f68ac25f9c5664dd0f688
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
index 8df13e0..feb0f57 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
@@ -2,14 +2,22 @@
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
+import net.onrc.onos.datagrid.IEventChannelListener;
 import net.onrc.onos.datastore.topology.RCLink;
 import net.onrc.onos.datastore.topology.RCPort;
 import net.onrc.onos.datastore.topology.RCSwitch;
 import net.onrc.onos.ofcontroller.networkgraph.PortEvent.SwitchPort;
+import net.onrc.onos.ofcontroller.util.EventEntry;
 import net.onrc.onos.ofcontroller.util.Dpid;
 
 import org.slf4j.Logger;
@@ -35,6 +43,10 @@
     private static final Logger log = LoggerFactory
 	    .getLogger(NetworkGraphImpl.class);
 
+    private IEventChannel<byte[], TopologyEvent> eventChannel;
+    private static final String EVENT_CHANNEL_NAME = "onos.topology";
+    private EventHandler eventHandler = new EventHandler();
+
     private final NetworkGraphDatastore datastore;
 
     public NetworkGraphImpl() {
@@ -43,6 +55,149 @@
     }
 
     /**
+     * Event handler class.
+     */
+    private class EventHandler extends Thread implements
+	IEventChannelListener<byte[], TopologyEvent> {
+	private BlockingQueue<EventEntry<TopologyEvent>> topologyEvents =
+	    new LinkedBlockingQueue<EventEntry<TopologyEvent>>();
+
+	/**
+	 * Startup processing.
+	 */
+	private void startup() {
+	    //
+	    // TODO: Read all state from the database
+	    // For now, as a shortcut we read it from the datagrid
+	    //
+	    Collection<TopologyEvent> topologyEvents =
+		eventChannel.getAllEntries();
+	    Collection<EventEntry<TopologyEvent>> collection =
+		new LinkedList<EventEntry<TopologyEvent>>();
+
+	    for (TopologyEvent topologyEvent : topologyEvents) {
+		EventEntry<TopologyEvent> eventEntry =
+		    new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+						  topologyEvent);
+		collection.add(eventEntry);
+	    }
+	    processEvents(collection);
+	}
+
+	/**
+	 * Run the thread.
+	 */
+	public void run() {
+	    Collection<EventEntry<TopologyEvent>> collection =
+		new LinkedList<EventEntry<TopologyEvent>>();
+
+	    this.setName("NetworkGraphImpl.EventHandler " + this.getId());
+	    startup();
+
+	    //
+	    // The main loop
+	    //
+	    try {
+		while (true) {
+		    EventEntry<TopologyEvent> eventEntry = topologyEvents.take();
+		    collection.add(eventEntry);
+		    topologyEvents.drainTo(collection);
+
+		    processEvents(collection);
+		    collection.clear();
+		}
+	    } catch (Exception exception) {
+		log.debug("Exception processing Topology Events: ", exception);
+	    }
+	}
+
+	/**
+	 * Process all topology events.
+	 *
+	 * @param events the events to process.
+	 */
+	private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
+	    for (EventEntry<TopologyEvent> event : events) {
+		TopologyEvent topologyEvent = event.eventData();
+		switch (event.eventType()) {
+		case ENTRY_ADD:
+		    log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
+		    if (topologyEvent.switchEvent != null)
+			putSwitchReplicationEvent(topologyEvent.switchEvent);
+		    if (topologyEvent.portEvent != null)
+			putPortReplicationEvent(topologyEvent.portEvent);
+		    if (topologyEvent.linkEvent != null)
+			putLinkReplicationEvent(topologyEvent.linkEvent);
+		    if (topologyEvent.deviceEvent != null)
+			putDeviceReplicationEvent(topologyEvent.deviceEvent);
+		    break;
+		case ENTRY_REMOVE:
+		    log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
+		    if (topologyEvent.switchEvent != null)
+			removeSwitchReplicationEvent(topologyEvent.switchEvent);
+		    if (topologyEvent.portEvent != null)
+			removePortReplicationEvent(topologyEvent.portEvent);
+		    if (topologyEvent.linkEvent != null)
+			removeLinkReplicationEvent(topologyEvent.linkEvent);
+		    if (topologyEvent.deviceEvent != null)
+			removeDeviceReplicationEvent(topologyEvent.deviceEvent);
+		    break;
+		}
+	    }
+	}
+
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param value the value for the entry.
+	 */
+	@Override
+	public void entryAdded(TopologyEvent value) {
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+					      value);
+	    topologyEvents.add(eventEntry);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param value the value for the entry.
+	 */
+	@Override
+	public void entryRemoved(TopologyEvent value) {
+	    EventEntry<TopologyEvent> eventEntry =
+		new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_REMOVE,
+					      value);
+	    topologyEvents.add(eventEntry);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param value the value for the entry.
+	 */
+	@Override
+	public void entryUpdated(TopologyEvent value) {
+	    // NOTE: The ADD and UPDATE events are processed in same way
+	    entryAdded(value);
+	}
+    }
+
+    /**
+     * Startup processing.
+     *
+     * @param datagridService the datagrid service to use.
+     */
+    void startup(IDatagridService datagridService) {
+	eventChannel = datagridService.addListener(EVENT_CHANNEL_NAME,
+						   eventHandler,
+						   byte[].class,
+						   TopologyEvent.class);
+	eventHandler.start();
+    }
+
+    /**
      * Exception to be thrown when Modification to the Network Graph cannot be continued due to broken invariant.
      *
      * XXX Should this be checked exception or RuntimeException
@@ -68,7 +223,11 @@
 		if (prepareForAddSwitchEvent(switchEvent)) {
 			datastore.addSwitch(switchEvent);
 			putSwitch(switchEvent);
-			// TODO send out notification
+			// Send out notification
+			TopologyEvent topologyEvent =
+			    new TopologyEvent(switchEvent);
+			eventChannel.addEntry(topologyEvent.getID(),
+					      topologyEvent);
 		}
 		// TODO handle invariant violation
 	}
@@ -78,7 +237,8 @@
 		if (prepareForRemoveSwitchEvent(switchEvent)) {
 			datastore.deactivateSwitch(switchEvent);
 			removeSwitch(switchEvent);
-			// TODO send out notification
+			// Send out notification
+			eventChannel.removeEntry(switchEvent.getID());
 		}
 		// TODO handle invariant violation
 	}
@@ -88,7 +248,11 @@
 		if (prepareForAddPortEvent(portEvent)) {
 			datastore.addPort(portEvent);
 			putPort(portEvent);
-			// TODO send out notification
+			// Send out notification
+			TopologyEvent topologyEvent =
+			    new TopologyEvent(portEvent);
+			eventChannel.addEntry(topologyEvent.getID(),
+					      topologyEvent);
 		}
 		// TODO handle invariant violation
 	}
@@ -98,7 +262,8 @@
 		if (prepareForRemovePortEvent(portEvent)) {
 			datastore.deactivatePort(portEvent);
 			removePort(portEvent);
-			// TODO send out notification
+			// Send out notification
+			eventChannel.removeEntry(portEvent.getID());
 		}
 		// TODO handle invariant violation
 	}
@@ -108,7 +273,11 @@
 		if (prepareForAddLinkEvent(linkEvent)) {
 			datastore.addLink(linkEvent);
 			putLink(linkEvent);
-			// TODO send out notification
+			// Send out notification
+			TopologyEvent topologyEvent =
+			    new TopologyEvent(linkEvent);
+			eventChannel.addEntry(topologyEvent.getID(),
+					      topologyEvent);
 		}
 		// TODO handle invariant violation
 	}
@@ -118,7 +287,8 @@
 		if (prepareForRemoveLinkEvent(linkEvent)) {
 			datastore.removeLink(linkEvent);
 			removeLink(linkEvent);
-			// TODO send out notification
+			// Send out notification
+			eventChannel.removeEntry(linkEvent.getID());
 		}
 		// TODO handle invariant violation
 	}
@@ -127,7 +297,11 @@
 	public void putDeviceEvent(DeviceEvent deviceEvent) {
 		if (prepareForAddDeviceEvent(deviceEvent)) {
 //			datastore.addDevice(deviceEvent);
-			// TODO send out notification
+			// Send out notification
+			TopologyEvent topologyEvent =
+			    new TopologyEvent(deviceEvent);
+			eventChannel.addEntry(topologyEvent.getID(),
+					      topologyEvent);
 		}
 		// TODO handle invariant violation
 		// XXX if prepareFor~ method returned false, event should be dropped
@@ -137,7 +311,8 @@
 	public void removeDeviceEvent(DeviceEvent deviceEvent) {
 		if (prepareForRemoveDeviceEvent(deviceEvent)) {
 //			datastore.removeDevice(deviceEvent);
-			// TODO send out notification
+			// Send out notification
+			eventChannel.removeEntry(deviceEvent.getID());
 		}
 		// TODO handle invariant violation
 		// XXX if prepareFor~ method returned false, event should be dropped