Integrate the new notification framework with the new NetworkGraph
implementation.
Change-Id: I93033a5747c216dd336f68ac25f9c5664dd0f688
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
index 8df13e0..feb0f57 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
@@ -2,14 +2,22 @@
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
+import net.onrc.onos.datagrid.IEventChannelListener;
import net.onrc.onos.datastore.topology.RCLink;
import net.onrc.onos.datastore.topology.RCPort;
import net.onrc.onos.datastore.topology.RCSwitch;
import net.onrc.onos.ofcontroller.networkgraph.PortEvent.SwitchPort;
+import net.onrc.onos.ofcontroller.util.EventEntry;
import net.onrc.onos.ofcontroller.util.Dpid;
import org.slf4j.Logger;
@@ -35,6 +43,10 @@
private static final Logger log = LoggerFactory
.getLogger(NetworkGraphImpl.class);
+ private IEventChannel<byte[], TopologyEvent> eventChannel;
+ private static final String EVENT_CHANNEL_NAME = "onos.topology";
+ private EventHandler eventHandler = new EventHandler();
+
private final NetworkGraphDatastore datastore;
public NetworkGraphImpl() {
@@ -43,6 +55,149 @@
}
/**
+ * Event handler class.
+ */
+ private class EventHandler extends Thread implements
+ IEventChannelListener<byte[], TopologyEvent> {
+ private BlockingQueue<EventEntry<TopologyEvent>> topologyEvents =
+ new LinkedBlockingQueue<EventEntry<TopologyEvent>>();
+
+ /**
+ * Startup processing.
+ */
+ private void startup() {
+ //
+ // TODO: Read all state from the database
+ // For now, as a shortcut we read it from the datagrid
+ //
+ Collection<TopologyEvent> topologyEvents =
+ eventChannel.getAllEntries();
+ Collection<EventEntry<TopologyEvent>> collection =
+ new LinkedList<EventEntry<TopologyEvent>>();
+
+ for (TopologyEvent topologyEvent : topologyEvents) {
+ EventEntry<TopologyEvent> eventEntry =
+ new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+ topologyEvent);
+ collection.add(eventEntry);
+ }
+ processEvents(collection);
+ }
+
+ /**
+ * Run the thread.
+ */
+ public void run() {
+ Collection<EventEntry<TopologyEvent>> collection =
+ new LinkedList<EventEntry<TopologyEvent>>();
+
+ this.setName("NetworkGraphImpl.EventHandler " + this.getId());
+ startup();
+
+ //
+ // The main loop
+ //
+ try {
+ while (true) {
+ EventEntry<TopologyEvent> eventEntry = topologyEvents.take();
+ collection.add(eventEntry);
+ topologyEvents.drainTo(collection);
+
+ processEvents(collection);
+ collection.clear();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception processing Topology Events: ", exception);
+ }
+ }
+
+ /**
+ * Process all topology events.
+ *
+ * @param events the events to process.
+ */
+ private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
+ for (EventEntry<TopologyEvent> event : events) {
+ TopologyEvent topologyEvent = event.eventData();
+ switch (event.eventType()) {
+ case ENTRY_ADD:
+ log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
+ if (topologyEvent.switchEvent != null)
+ putSwitchReplicationEvent(topologyEvent.switchEvent);
+ if (topologyEvent.portEvent != null)
+ putPortReplicationEvent(topologyEvent.portEvent);
+ if (topologyEvent.linkEvent != null)
+ putLinkReplicationEvent(topologyEvent.linkEvent);
+ if (topologyEvent.deviceEvent != null)
+ putDeviceReplicationEvent(topologyEvent.deviceEvent);
+ break;
+ case ENTRY_REMOVE:
+ log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
+ if (topologyEvent.switchEvent != null)
+ removeSwitchReplicationEvent(topologyEvent.switchEvent);
+ if (topologyEvent.portEvent != null)
+ removePortReplicationEvent(topologyEvent.portEvent);
+ if (topologyEvent.linkEvent != null)
+ removeLinkReplicationEvent(topologyEvent.linkEvent);
+ if (topologyEvent.deviceEvent != null)
+ removeDeviceReplicationEvent(topologyEvent.deviceEvent);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param value the value for the entry.
+ */
+ @Override
+ public void entryAdded(TopologyEvent value) {
+ EventEntry<TopologyEvent> eventEntry =
+ new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+ value);
+ topologyEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param value the value for the entry.
+ */
+ @Override
+ public void entryRemoved(TopologyEvent value) {
+ EventEntry<TopologyEvent> eventEntry =
+ new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_REMOVE,
+ value);
+ topologyEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param value the value for the entry.
+ */
+ @Override
+ public void entryUpdated(TopologyEvent value) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ entryAdded(value);
+ }
+ }
+
+ /**
+ * Startup processing.
+ *
+ * @param datagridService the datagrid service to use.
+ */
+ void startup(IDatagridService datagridService) {
+ eventChannel = datagridService.addListener(EVENT_CHANNEL_NAME,
+ eventHandler,
+ byte[].class,
+ TopologyEvent.class);
+ eventHandler.start();
+ }
+
+ /**
* Exception to be thrown when Modification to the Network Graph cannot be continued due to broken invariant.
*
* XXX Should this be checked exception or RuntimeException
@@ -68,7 +223,11 @@
if (prepareForAddSwitchEvent(switchEvent)) {
datastore.addSwitch(switchEvent);
putSwitch(switchEvent);
- // TODO send out notification
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(switchEvent);
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
}
// TODO handle invariant violation
}
@@ -78,7 +237,8 @@
if (prepareForRemoveSwitchEvent(switchEvent)) {
datastore.deactivateSwitch(switchEvent);
removeSwitch(switchEvent);
- // TODO send out notification
+ // Send out notification
+ eventChannel.removeEntry(switchEvent.getID());
}
// TODO handle invariant violation
}
@@ -88,7 +248,11 @@
if (prepareForAddPortEvent(portEvent)) {
datastore.addPort(portEvent);
putPort(portEvent);
- // TODO send out notification
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(portEvent);
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
}
// TODO handle invariant violation
}
@@ -98,7 +262,8 @@
if (prepareForRemovePortEvent(portEvent)) {
datastore.deactivatePort(portEvent);
removePort(portEvent);
- // TODO send out notification
+ // Send out notification
+ eventChannel.removeEntry(portEvent.getID());
}
// TODO handle invariant violation
}
@@ -108,7 +273,11 @@
if (prepareForAddLinkEvent(linkEvent)) {
datastore.addLink(linkEvent);
putLink(linkEvent);
- // TODO send out notification
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(linkEvent);
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
}
// TODO handle invariant violation
}
@@ -118,7 +287,8 @@
if (prepareForRemoveLinkEvent(linkEvent)) {
datastore.removeLink(linkEvent);
removeLink(linkEvent);
- // TODO send out notification
+ // Send out notification
+ eventChannel.removeEntry(linkEvent.getID());
}
// TODO handle invariant violation
}
@@ -127,7 +297,11 @@
public void putDeviceEvent(DeviceEvent deviceEvent) {
if (prepareForAddDeviceEvent(deviceEvent)) {
// datastore.addDevice(deviceEvent);
- // TODO send out notification
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(deviceEvent);
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
}
// TODO handle invariant violation
// XXX if prepareFor~ method returned false, event should be dropped
@@ -137,7 +311,8 @@
public void removeDeviceEvent(DeviceEvent deviceEvent) {
if (prepareForRemoveDeviceEvent(deviceEvent)) {
// datastore.removeDevice(deviceEvent);
- // TODO send out notification
+ // Send out notification
+ eventChannel.removeEntry(deviceEvent.getID());
}
// TODO handle invariant violation
// XXX if prepareFor~ method returned false, event should be dropped
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java
index 7315f1c..3edf5c2 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java
@@ -11,6 +11,7 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
+import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.networkgraph.web.NetworkGraphWebRoutable;
public class NetworkGraphModule implements IFloodlightModule, INetworkGraphService {
@@ -20,6 +21,7 @@
private NetworkGraphImpl networkGraph;
//private NetworkGraphDatastore southboundNetworkGraph;
+ private IDatagridService datagridService;
private IRestApiService restApi;
@@ -44,6 +46,7 @@
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
List<Class<? extends IFloodlightService>> dependencies =
new ArrayList<Class<? extends IFloodlightService>>();
+ dependencies.add(IDatagridService.class);
dependencies.add(IRestApiService.class);
return dependencies;
}
@@ -52,6 +55,7 @@
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
restApi = context.getServiceImpl(IRestApiService.class);
+ datagridService = context.getServiceImpl(IDatagridService.class);
networkGraph = new NetworkGraphImpl();
//southboundNetworkGraph = new NetworkGraphDatastore(networkGraph);
@@ -60,6 +64,7 @@
@Override
public void startUp(FloodlightModuleContext context) {
restApi.addRestletRoutable(new NetworkGraphWebRoutable());
+ networkGraph.startup(datagridService);
}
@Override