Added the Hazelcast and notification-related code to send/receive
FlowId notifications. Those notifications will be used in the experimental
testing of sending notifications with only the added/updated/deleted FlowId,
and reading the Flow state itself from the database.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 8b1f7c0..0728f9a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -63,6 +63,8 @@
new LinkedList<EventEntry<FlowPath>>();
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ private List<EventEntry<FlowId>> flowIdEvents =
+ new LinkedList<EventEntry<FlowId>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -137,6 +139,16 @@
flowEntryEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowId state
+ //
+ Collection<FlowId> flowIds = datagridService.getAllFlowIds();
+ for (FlowId flowId : flowIds) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ flowIdEvents.add(eventEntry);
+ }
+
// Process the initial events (if any)
synchronized (allFlowPaths) {
processEvents();
@@ -166,6 +178,7 @@
// - EventEntry<TopologyElement>
// - EventEntry<FlowPath>
// - EventEntry<FlowEntry>
+ // - EventEntry<FlowId>
//
for (EventEntry<?> event : collection) {
// Topology event
@@ -191,6 +204,14 @@
flowEntryEvents.add(flowEntryEventEntry);
continue;
}
+
+ // FlowId event
+ if (event.eventData() instanceof FlowId) {
+ EventEntry<FlowId> flowIdEventEntry =
+ (EventEntry<FlowId>)event;
+ flowIdEvents.add(flowIdEventEntry);
+ continue;
+ }
}
collection.clear();
@@ -215,6 +236,7 @@
return; // Nothing to do
}
+ processFlowIdEvents();
processFlowPathEvents();
processTopologyEvents();
processUnmatchedFlowEntryAdd();
@@ -254,6 +276,7 @@
topologyEvents.clear();
flowPathEvents.clear();
flowEntryEvents.clear();
+ flowIdEvents.clear();
//
shouldRecomputeFlowPaths.clear();
modifiedFlowPaths.clear();
@@ -346,6 +369,41 @@
}
/**
+ * Process the Flow ID events.
+ */
+ private void processFlowIdEvents() {
+ //
+ // Process all Flow ID events and update the appropriate state
+ //
+ for (EventEntry<FlowId> eventEntry : flowIdEvents) {
+ FlowId flowId = eventEntry.eventData();
+
+ log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD: {
+ //
+ // Add a new Flow ID
+ //
+ // TODO: Implement it!
+
+ break;
+ }
+
+ case ENTRY_REMOVE: {
+ //
+ // Remove an existing Flow ID.
+ //
+ // TODO: Implement it!
+
+ break;
+ }
+ }
+ }
+ }
+
+
+ /**
* Process the Flow Path events.
*/
private void processFlowPathEvents() {
@@ -972,6 +1030,43 @@
}
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ @Override
+ public void notificationRecvFlowIdAdded(FlowId flowId) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ @Override
+ public void notificationRecvFlowIdRemoved(FlowId flowId) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_REMOVE, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ @Override
+ public void notificationRecvFlowIdUpdated(FlowId flowId) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..62edf70 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -2,6 +2,7 @@
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
/**
@@ -51,6 +52,27 @@
void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ void notificationRecvFlowIdAdded(FlowId flowId);
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ void notificationRecvFlowIdRemoved(FlowId flowId);
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ void notificationRecvFlowIdUpdated(FlowId flowId);
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.