Implemented the mechanism to send notifications with the Flow Entry ID
and the Switch DPID when a Flow Entry needs to be pushed into a switch.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index effbe81..e54dfc5 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -21,6 +21,7 @@
import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
@@ -79,6 +80,12 @@
private MapFlowIdListener mapFlowIdListener = null;
private String mapFlowIdListenerId = null;
+ // State related to the Flow Entry ID map
+ protected static final String mapFlowEntryIdName = "mapFlowEntryId";
+ private IMap<Long, byte[]> mapFlowEntryId = null;
+ private MapFlowEntryIdListener mapFlowEntryIdListener = null;
+ private String mapFlowEntryIdListenerId = null;
+
// State related to the Network Topology map
protected static final String mapTopologyName = "mapTopology";
private IMap<String, byte[]> mapTopology = null;
@@ -257,7 +264,7 @@
* @param event the notification event for the entry.
*/
public void entryAdded(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -275,7 +282,7 @@
* @param event the notification event for the entry.
*/
public void entryRemoved(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -293,7 +300,7 @@
* @param event the notification event for the entry.
*/
public void entryUpdated(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -316,6 +323,87 @@
}
/**
+ * Class for receiving notifications for FlowEntryId state.
+ *
+ * The datagrid map is:
+ * - Key : FlowEntryId (Long)
+ * - Value : Serialized Switch Dpid (byte[])
+ */
+ class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<Long, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
* Class for receiving notifications for Network Topology state.
*
* The datagrid map is:
@@ -604,6 +692,11 @@
mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+ // Initialize the FlowEntryId-related map state
+ mapFlowEntryIdListener = new MapFlowEntryIdListener();
+ mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
+ mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
+
// Initialize the Topology-related map state
mapTopologyListener = new MapTopologyListener();
mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -636,6 +729,11 @@
mapFlowId = null;
mapFlowIdListener = null;
+ // Clear the FlowEntryId-related map state
+ mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
+ mapFlowEntryId = null;
+ mapFlowEntryIdListener = null;
+
// Clear the Topology-related map state
mapTopology.removeEntryListener(mapTopologyListenerId);
mapTopology = null;
@@ -988,6 +1086,78 @@
}
/**
+ * Send a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, dpid);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowEntryId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ */
+ @Override
+ public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
+ //
+ // Remove the entry:
+ // - Key : FlowEntryId (Long)
+ // - Value : Serialized Dpid (byte[])
+ //
+ mapFlowEntryId.removeAsync(flowEntryId.value());
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowEntryIdAdded(flowEntryId, dpid);
+ }
+
+ /**
+ * Send a notification that all Flow Entry IDs are removed.
+ */
+ @Override
+ public void notificationSendAllFlowEntryIdsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowEntryId.clear();
+ Set<Long> keySet = mapFlowEntryId.keySet();
+ for (Long key : keySet) {
+ mapFlowEntryId.removeAsync(key);
+ }
+ }
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index d4e7b00..20a43d2 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -7,6 +7,7 @@
import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
@@ -167,6 +168,35 @@
void notificationSendAllFlowIdsRemoved();
/**
+ * Send a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid.
+ */
+ void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+ /**
+ * Send a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ */
+ void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId);
+
+ /**
+ * Send a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid.
+ */
+ void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
+ * Send a notification that all Flow Entry IDs are removed.
+ */
+ void notificationSendAllFlowEntryIdsRemoved();
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index d4a563b..ab8432c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -25,6 +25,8 @@
* Class for performing Flow-related operations on the Database.
*/
public class FlowDatabaseOperation {
+ static private boolean enableOnrc2014MeasurementsFlows = true;
+
private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
/**
@@ -169,8 +171,10 @@
// flowPath.dataPath().flowEntries()
//
for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
- if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE)
- continue; // Skip: all Flow Entries were deleted earlier
+ if (! enableOnrc2014MeasurementsFlows) {
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE)
+ continue; // Skip: all Flow Entries were deleted earlier
+ }
if (addFlowEntry(dbHandler, flowObj, flowEntry) == null) {
dbHandler.rollback();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 0f421c3..d4b57f8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -1217,6 +1217,58 @@
}
/**
+ * Receive a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ // TODO: Implement it!
+ /*
+ EventEntry<FlowEntryId> eventEntry =
+ new EventEntry<FlowEntryId>(EventEntry.Type.ENTRY_ADD, flowEntryId);
+ networkEvents.add(eventEntry);
+ */
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ // TODO: Implement it!
+ /*
+ EventEntry<FlowEntryId> eventEntry =
+ new EventEntry<FlowEntryId>(EventEntry.Type.ENTRY_REMOVE, flowEntryId);
+ networkEvents.add(eventEntry);
+ */
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ // TODO: Implement it!
+ /*
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowEntryId> eventEntry =
+ new EventEntry<FlowEntryId>(EventEntry.Type.ENTRY_ADD, flowEntryId);
+ networkEvents.add(eventEntry);
+ */
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 62edf70..04f92bc 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -1,7 +1,9 @@
package net.onrc.onos.ofcontroller.flowmanager;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -73,6 +75,32 @@
void notificationRecvFlowIdUpdated(FlowId flowId);
/**
+ * Receive a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.