Added the Hazelcast and notification-related code to send/receive
FlowId notifications. Those notifications will be used in the experimental
testing of sending notifications with only the added/updated/deleted FlowId,
and reading the Flow state itself from the database.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 6483121..d6c870d 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -73,6 +73,12 @@
private MapFlowEntryListener mapFlowEntryListener = null;
private String mapFlowEntryListenerId = null;
+ // State related to the Flow ID map
+ protected static final String mapFlowIdName = "mapFlowId";
+ private IMap<Long, byte[]> mapFlowId = null;
+ private MapFlowIdListener mapFlowIdListener = null;
+ private String mapFlowIdListenerId = null;
+
// State related to the Network Topology map
protected static final String mapTopologyName = "mapTopology";
private IMap<String, byte[]> mapTopology = null;
@@ -230,6 +236,78 @@
}
/**
+ * Class for receiving notifications for FlowId state.
+ *
+ * The datagrid map is:
+ * - Key : FlowId (Long)
+ * - Value : Serialized FlowId (byte[])
+ */
+ class MapFlowIdListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<Long, byte[]> event) {
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<Long, byte[]> event) {
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<Long, byte[]> event) {
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<Long, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
* Class for receiving notifications for Network Topology state.
*
* The datagrid map is:
@@ -504,6 +582,11 @@
mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
+ // Initialize the FlowId-related map state
+ mapFlowIdListener = new MapFlowIdListener();
+ mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+ mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
// Initialize the Topology-related map state
mapTopologyListener = new MapTopologyListener();
mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -531,6 +614,11 @@
mapFlowEntry = null;
mapFlowEntryListener = null;
+ // Clear the FlowId-related map state
+ mapFlowId.removeEntryListener(mapFlowIdListenerId);
+ mapFlowId = null;
+ mapFlowIdListener = null;
+
// Clear the Topology-related map state
mapTopology.removeEntryListener(mapTopologyListenerId);
mapTopology = null;
@@ -788,6 +876,101 @@
}
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that are currently in the datagrid.
+ */
+ @Override
+ public Collection<FlowId> getAllFlowIds() {
+ Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
+
+ //
+ // Get all current entries
+ //
+ Collection<byte[]> values = mapFlowId.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ allFlowIds.add(flowId);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowIds;
+ }
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ @Override
+ public void notificationSendFlowIdAdded(FlowId flowId) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, flowId);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized FlowId (byte[])
+ //
+ mapFlowId.putAsync(flowId.value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ @Override
+ public void notificationSendFlowIdRemoved(FlowId flowId) {
+ //
+ // Remove the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized FlowId (byte[])
+ //
+ mapFlowId.removeAsync(flowId.value());
+ }
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ @Override
+ public void notificationSendFlowIdUpdated(FlowId flowId) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowIdAdded(flowId);
+ }
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ @Override
+ public void notificationSendAllFlowIdsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowId.clear();
+ Set<Long> keySet = mapFlowId.keySet();
+ for (Long key : keySet) {
+ mapFlowId.removeAsync(key);
+ }
+ }
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..d4e7b00 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -134,6 +134,39 @@
void notificationSendAllFlowEntriesRemoved();
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that are currently in the datagrid.
+ */
+ Collection<FlowId> getAllFlowIds();
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ void notificationSendFlowIdAdded(FlowId flowId);
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ void notificationSendFlowIdRemoved(FlowId flowId);
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ void notificationSendFlowIdUpdated(FlowId flowId);
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ void notificationSendAllFlowIdsRemoved();
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 8b1f7c0..0728f9a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -63,6 +63,8 @@
new LinkedList<EventEntry<FlowPath>>();
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ private List<EventEntry<FlowId>> flowIdEvents =
+ new LinkedList<EventEntry<FlowId>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -137,6 +139,16 @@
flowEntryEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowId state
+ //
+ Collection<FlowId> flowIds = datagridService.getAllFlowIds();
+ for (FlowId flowId : flowIds) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ flowIdEvents.add(eventEntry);
+ }
+
// Process the initial events (if any)
synchronized (allFlowPaths) {
processEvents();
@@ -166,6 +178,7 @@
// - EventEntry<TopologyElement>
// - EventEntry<FlowPath>
// - EventEntry<FlowEntry>
+ // - EventEntry<FlowId>
//
for (EventEntry<?> event : collection) {
// Topology event
@@ -191,6 +204,14 @@
flowEntryEvents.add(flowEntryEventEntry);
continue;
}
+
+ // FlowId event
+ if (event.eventData() instanceof FlowId) {
+ EventEntry<FlowId> flowIdEventEntry =
+ (EventEntry<FlowId>)event;
+ flowIdEvents.add(flowIdEventEntry);
+ continue;
+ }
}
collection.clear();
@@ -215,6 +236,7 @@
return; // Nothing to do
}
+ processFlowIdEvents();
processFlowPathEvents();
processTopologyEvents();
processUnmatchedFlowEntryAdd();
@@ -254,6 +276,7 @@
topologyEvents.clear();
flowPathEvents.clear();
flowEntryEvents.clear();
+ flowIdEvents.clear();
//
shouldRecomputeFlowPaths.clear();
modifiedFlowPaths.clear();
@@ -346,6 +369,41 @@
}
/**
+ * Process the Flow ID events.
+ */
+ private void processFlowIdEvents() {
+ //
+ // Process all Flow ID events and update the appropriate state
+ //
+ for (EventEntry<FlowId> eventEntry : flowIdEvents) {
+ FlowId flowId = eventEntry.eventData();
+
+ log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD: {
+ //
+ // Add a new Flow ID
+ //
+ // TODO: Implement it!
+
+ break;
+ }
+
+ case ENTRY_REMOVE: {
+ //
+ // Remove an existing Flow ID.
+ //
+ // TODO: Implement it!
+
+ break;
+ }
+ }
+ }
+ }
+
+
+ /**
* Process the Flow Path events.
*/
private void processFlowPathEvents() {
@@ -972,6 +1030,43 @@
}
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ @Override
+ public void notificationRecvFlowIdAdded(FlowId flowId) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ @Override
+ public void notificationRecvFlowIdRemoved(FlowId flowId) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_REMOVE, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ @Override
+ public void notificationRecvFlowIdUpdated(FlowId flowId) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..62edf70 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -2,6 +2,7 @@
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
/**
@@ -51,6 +52,27 @@
void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ void notificationRecvFlowIdAdded(FlowId flowId);
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ void notificationRecvFlowIdRemoved(FlowId flowId);
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ void notificationRecvFlowIdUpdated(FlowId flowId);
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.