* Added the missing method HazelcastDatagr.digetAllFlowEntryIds()
* Implemented the glue to pass the Flow Entry ID events to the event
processing routing.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index e54dfc5..632cc38 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -26,6 +26,7 @@
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import org.slf4j.Logger;
@@ -1018,6 +1019,40 @@
}
/**
+ * Get all Flow Entry IDs that are currently in the datagrid.
+ *
+ * @return all Flow Entry IDs that ae currently in the datagrid.
+ */
+ @Override
+ public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
+ Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
+ new LinkedList<Pair<FlowEntryId, Dpid>>();
+
+ //
+ // Get all current entries
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
+ Long key = entry.getKey();
+ byte[] valueBytes = entry.getValue();
+
+ FlowEntryId flowEntryId = new FlowEntryId(key);
+
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+
+ Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
+ allFlowEntryIds.add(pair);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowEntryIds;
+ }
+
+ /**
* Send a notification that a FlowId is added.
*
* @param flowId the FlowId that is added.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 20a43d2..cfc10bc 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -12,6 +12,7 @@
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
/**
* Interface for providing Datagrid Service to other modules.
@@ -137,7 +138,7 @@
/**
* Get all Flow IDs that are currently in the datagrid.
*
- * @return all Flow IDs that are currently in the datagrid.
+ * @return all Flow IDs that ae currently in the datagrid.
*/
Collection<FlowId> getAllFlowIds();
@@ -168,6 +169,13 @@
void notificationSendAllFlowIdsRemoved();
/**
+ * Get all Flow Entry IDs that are currently in the datagrid.
+ *
+ * @return all Flow Entry IDs that ae currently in the datagrid.
+ */
+ Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds();
+
+ /**
* Send a notification that a FlowEntryId is added.
*
* @param flowEntryId the FlowEntryId that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index d4b57f8..8000f49 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -31,6 +31,7 @@
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Pair;
import net.onrc.onos.ofcontroller.util.Port;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
@@ -73,6 +74,8 @@
new LinkedList<EventEntry<FlowEntry>>();
private List<EventEntry<FlowId>> flowIdEvents =
new LinkedList<EventEntry<FlowId>>();
+ private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
+ new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -159,6 +162,17 @@
flowIdEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowEntryId state
+ //
+ Collection<Pair<FlowEntryId, Dpid>> flowEntryIds =
+ datagridService.getAllFlowEntryIds();
+ for (Pair<FlowEntryId, Dpid> pair : flowEntryIds) {
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+ flowEntryIdEvents.add(eventEntry);
+ }
+
// Process the initial events (if any)
synchronized (allFlowPaths) {
processEvents();
@@ -189,6 +203,7 @@
// - EventEntry<FlowPath>
// - EventEntry<FlowEntry>
// - EventEntry<FlowId>
+ // - EventEntry<Pair<FlowEntryId, Dpid>>
//
for (EventEntry<?> event : collection) {
// Topology event
@@ -223,6 +238,13 @@
flowIdEvents.add(flowIdEventEntry);
continue;
}
+ // FlowEntryId event
+ if (event.eventData() instanceof Pair) {
+ EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
+ (EventEntry<Pair<FlowEntryId, Dpid>>)event;
+ flowEntryIdEvents.add(flowEntryIdEventEntry);
+ continue;
+ }
}
collection.clear();
@@ -244,7 +266,8 @@
if (enableOnrc2014MeasurementsFlows) {
- if (topologyEvents.isEmpty() && flowIdEvents.isEmpty()) {
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
+ flowEntryIdEvents.isEmpty()) {
return; // Nothing to do
}
@@ -284,6 +307,7 @@
// Cleanup
topologyEvents.clear();
flowIdEvents.clear();
+ flowEntryIdEvents.clear();
//
allFlowPaths.clear();
shouldRecomputeFlowPaths.clear();
@@ -1225,12 +1249,11 @@
@Override
public void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId,
Dpid dpid) {
- // TODO: Implement it!
- /*
- EventEntry<FlowEntryId> eventEntry =
- new EventEntry<FlowEntryId>(EventEntry.Type.ENTRY_ADD, flowEntryId);
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
networkEvents.add(eventEntry);
- */
}
/**
@@ -1242,12 +1265,11 @@
@Override
public void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
Dpid dpid) {
- // TODO: Implement it!
- /*
- EventEntry<FlowEntryId> eventEntry =
- new EventEntry<FlowEntryId>(EventEntry.Type.ENTRY_REMOVE, flowEntryId);
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowEntryIdPair);
networkEvents.add(eventEntry);
- */
}
/**
@@ -1259,13 +1281,12 @@
@Override
public void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
Dpid dpid) {
- // TODO: Implement it!
- /*
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
// NOTE: The ADD and UPDATE events are processed in same way
- EventEntry<FlowEntryId> eventEntry =
- new EventEntry<FlowEntryId>(EventEntry.Type.ENTRY_ADD, flowEntryId);
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
networkEvents.add(eventEntry);
- */
}
/**