Added a placeholder for propagating Flow Entries through the datagrid.
Not used for now.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 481002f..484bb14 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -20,6 +20,8 @@
import net.onrc.onos.ofcontroller.flowmanager.IPathComputationService;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
@@ -52,14 +54,20 @@
private Config hazelcastConfig = null;
private KryoFactory kryoFactory = new KryoFactory();
+ private IPathComputationService pathComputationService = null;
// State related to the Flow map
protected static final String mapFlowName = "mapFlow";
- private IPathComputationService pathComputationService = null;
private IMap<Long, byte[]> mapFlow = null;
private MapFlowListener mapFlowListener = null;
private String mapFlowListenerId = null;
+ // State related to the Flow Entry map
+ protected static final String mapFlowEntryName = "mapFlowEntry";
+ private IMap<Long, byte[]> mapFlowEntry = null;
+ private MapFlowEntryListener mapFlowEntryListener = null;
+ private String mapFlowEntryListenerId = null;
+
// State related to the Network Topology map
protected static final String mapTopologyName = "mapTopology";
private IMap<String, byte[]> mapTopology = null;
@@ -142,6 +150,81 @@
}
/**
+ * Class for receiving notifications for FlowEntry state.
+ *
+ * The datagrid map is:
+ * - Key : FlowEntry ID (Long)
+ * - Value : Serialized FlowEntry (byte[])
+ */
+ class MapFlowEntryListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
+ kryoFactory.deleteKryo(kryo);
+ pathComputationService.notificationRecvFlowEntryAdded(flowEntry);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
+ kryoFactory.deleteKryo(kryo);
+ pathComputationService.notificationRecvFlowEntryRemoved(flowEntry);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
+ kryoFactory.deleteKryo(kryo);
+ pathComputationService.notificationRecvFlowEntryUpdated(flowEntry);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
* Class for receiving notifications for Network Topology state.
*
* The datagrid map is:
@@ -348,6 +431,11 @@
mapFlow = hazelcastInstance.getMap(mapFlowName);
mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
+ // Initialize the FlowEntry-related map state
+ mapFlowEntryListener = new MapFlowEntryListener();
+ mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
+ mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
+
// Initialize the Topology-related map state
mapTopologyListener = new MapTopologyListener();
mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -370,6 +458,11 @@
mapFlow = null;
mapFlowListener = null;
+ // Clear the FlowEntry-related map state
+ mapFlowEntry.removeEntryListener(mapFlowEntryListenerId);
+ mapFlowEntry = null;
+ mapFlowEntryListener = null;
+
// Clear the Topology-related map state
mapTopology.removeEntryListener(mapTopologyListenerId);
mapTopology = null;
@@ -408,7 +501,7 @@
/**
* Send a notification that a Flow is added.
*
- * @param flowPath the flow that is added.
+ * @param flowPath the Flow that is added.
*/
@Override
public void notificationSendFlowAdded(FlowPath flowPath) {
@@ -433,7 +526,7 @@
/**
* Send a notification that a Flow is removed.
*
- * @param flowId the Flow ID of the flow that is removed.
+ * @param flowId the Flow ID of the Flow that is removed.
*/
@Override
public void notificationSendFlowRemoved(FlowId flowId) {
@@ -448,7 +541,7 @@
/**
* Send a notification that a Flow is updated.
*
- * @param flowPath the flow that is updated.
+ * @param flowPath the Flow that is updated.
*/
@Override
public void notificationSendFlowUpdated(FlowPath flowPath) {
@@ -474,6 +567,101 @@
}
/**
+ * Get all Flow Entries that are currently in the datagrid.
+ *
+ * @return all Flow Entries that are currently in the datagrid.
+ */
+ @Override
+ public Collection<FlowEntry> getAllFlowEntries() {
+ Collection<FlowEntry> allFlowEntries = new LinkedList<FlowEntry>();
+
+ //
+ // Get all current entries
+ //
+ Collection<byte[]> values = mapFlowEntry.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
+ allFlowEntries.add(flowEntry);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowEntries;
+ }
+
+ /**
+ * Send a notification that a FlowEntry is added.
+ *
+ * @param flowEntry the FlowEntry that is added.
+ */
+ @Override
+ public void notificationSendFlowEntryAdded(FlowEntry flowEntry) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, flowEntry);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowEntry ID (Long)
+ // - Value : Serialized FlowEntry (byte[])
+ //
+ mapFlowEntry.putAsync(flowEntry.flowEntryId().value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowEntry is removed.
+ *
+ * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
+ */
+ @Override
+ public void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId) {
+ //
+ // Remove the entry:
+ // - Key : FlowEntry ID (Long)
+ // - Value : Serialized FlowEntry (byte[])
+ //
+ mapFlowEntry.removeAsync(flowEntryId.value());
+ }
+
+ /**
+ * Send a notification that a FlowEntry is updated.
+ *
+ * @param flowEntry the FlowEntry that is updated.
+ */
+ @Override
+ public void notificationSendFlowEntryUpdated(FlowEntry flowEntry) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowEntryAdded(flowEntry);
+ }
+
+ /**
+ * Send a notification that all Flow Entries are removed.
+ */
+ @Override
+ public void notificationSendAllFlowEntriesRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowEntry.clear();
+ Set<Long> keySet = mapFlowEntry.keySet();
+ for (Long key : keySet) {
+ mapFlowEntry.removeAsync(key);
+ }
+ }
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 10cd1e4..7735033 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -6,6 +6,8 @@
import net.onrc.onos.ofcontroller.flowmanager.IPathComputationService;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -44,21 +46,21 @@
/**
* Send a notification that a Flow is added.
*
- * @param flowPath the flow that is added.
+ * @param flowPath the Flow that is added.
*/
void notificationSendFlowAdded(FlowPath flowPath);
/**
* Send a notification that a Flow is removed.
*
- * @param flowId the Flow ID of the flow that is removed.
+ * @param flowId the Flow ID of the Flow that is removed.
*/
void notificationSendFlowRemoved(FlowId flowId);
/**
* Send a notification that a Flow is updated.
*
- * @param flowPath the flow that is updated.
+ * @param flowPath the Flow that is updated.
*/
void notificationSendFlowUpdated(FlowPath flowPath);
@@ -68,6 +70,39 @@
void notificationSendAllFlowsRemoved();
/**
+ * Get all Flow Entries that are currently in the datagrid.
+ *
+ * @return all Flow Entries that are currently in the datagrid.
+ */
+ Collection<FlowEntry> getAllFlowEntries();
+
+ /**
+ * Send a notification that a FlowEntry is added.
+ *
+ * @param flowEntry the FlowEntry that is added.
+ */
+ void notificationSendFlowEntryAdded(FlowEntry flowEntry);
+
+ /**
+ * Send a notification that a FlowEntry is removed.
+ *
+ * @param flowEntryId the FlowEntry ID of the FlowEntry that is removed.
+ */
+ void notificationSendFlowEntryRemoved(FlowEntryId flowEntryId);
+
+ /**
+ * Send a notification that a FlowEntry is updated.
+ *
+ * @param flowEntry the FlowEntry that is updated.
+ */
+ void notificationSendFlowEntryUpdated(FlowEntry flowEntry);
+
+ /**
+ * Send a notification that all Flow Entries are removed.
+ */
+ void notificationSendAllFlowEntriesRemoved();
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IPathComputationService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IPathComputationService.java
index 1bc0be1..1966683 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IPathComputationService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IPathComputationService.java
@@ -1,6 +1,7 @@
package net.onrc.onos.ofcontroller.flowmanager;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowPath;
/**
@@ -10,25 +11,46 @@
/**
* Receive a notification that a Flow is added.
*
- * @param flowPath the flow that is added.
+ * @param flowPath the Flow that is added.
*/
void notificationRecvFlowAdded(FlowPath flowPath);
/**
* Receive a notification that a Flow is removed.
*
- * @param flowPath the flow that is removed.
+ * @param flowPath the Flow that is removed.
*/
void notificationRecvFlowRemoved(FlowPath flowPath);
/**
* Receive a notification that a Flow is updated.
*
- * @param flowPath the flow that is updated.
+ * @param flowPath the Flow that is updated.
*/
void notificationRecvFlowUpdated(FlowPath flowPath);
/**
+ * Receive a notification that a FlowEntry is added.
+ *
+ * @param flowEntry the FlowEntry that is added.
+ */
+ void notificationRecvFlowEntryAdded(FlowEntry flowEntry);
+
+ /**
+ * Receive a notification that a FlowEntry is removed.
+ *
+ * @param flowEntry the FlowEntry that is removed.
+ */
+ void notificationRecvFlowEntryRemoved(FlowEntry flowEntry);
+
+ /**
+ * Receive a notification that a FlowEntry is updated.
+ *
+ * @param flowEntry the FlowEntry that is updated.
+ */
+ void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PathComputation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PathComputation.java
index ae14e09..b9806b5 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PathComputation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PathComputation.java
@@ -46,11 +46,13 @@
private BlockingQueue<EventEntry<?>> networkEvents =
new LinkedBlockingQueue<EventEntry<?>>();
- // The pending Topology and Flow Path events
+ // The pending Topology, FlowPath, and FlowEntry events
private List<EventEntry<TopologyElement>> topologyEvents =
new LinkedList<EventEntry<TopologyElement>>();
private List<EventEntry<FlowPath>> flowPathEvents =
new LinkedList<EventEntry<FlowPath>>();
+ private List<EventEntry<FlowEntry>> flowEntryEvents =
+ new LinkedList<EventEntry<FlowEntry>>();
/**
* Constructor for a given Flow Manager and Datagrid Service.
@@ -89,6 +91,16 @@
new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
flowPathEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowEntry state
+ //
+ Collection<FlowEntry> flowEntries = datagridService.getAllFlowEntries();
+ for (FlowEntry flowEntry : flowEntries) {
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
+ flowEntryEvents.add(eventEntry);
+ }
+
// Process the events (if any)
processEvents();
@@ -106,6 +118,7 @@
// Demultiplex all events:
// - EventEntry<TopologyElement>
// - EventEntry<FlowPath>
+ // - EventEntry<FlowEntry>
//
for (EventEntry<?> event : collection) {
if (event.eventData() instanceof TopologyElement) {
@@ -116,6 +129,10 @@
EventEntry<FlowPath> flowPathEventEntry =
(EventEntry<FlowPath>)event;
flowPathEvents.add(flowPathEventEntry);
+ } else if (event.eventData() instanceof FlowEntry) {
+ EventEntry<FlowEntry> flowEntryEventEntry =
+ (EventEntry<FlowEntry>)event;
+ flowEntryEvents.add(flowEntryEventEntry);
}
}
collection.clear();
@@ -136,8 +153,13 @@
List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
- if (topologyEvents.isEmpty() && flowPathEvents.isEmpty())
+ // TODO: For now we don't use/process the FlowEntry events
+ flowEntryEvents.clear();
+
+ if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
+ flowEntryEvents.isEmpty()) {
return; // Nothing to do
+ }
//
// Process the Flow Path events
@@ -422,7 +444,7 @@
/**
* Receive a notification that a Flow is added.
*
- * @param flowPath the flow that is added.
+ * @param flowPath the Flow that is added.
*/
@Override
public void notificationRecvFlowAdded(FlowPath flowPath) {
@@ -434,7 +456,7 @@
/**
* Receive a notification that a Flow is removed.
*
- * @param flowPath the flow that is removed.
+ * @param flowPath the Flow that is removed.
*/
@Override
public void notificationRecvFlowRemoved(FlowPath flowPath) {
@@ -446,7 +468,7 @@
/**
* Receive a notification that a Flow is updated.
*
- * @param flowPath the flow that is updated.
+ * @param flowPath the Flow that is updated.
*/
@Override
public void notificationRecvFlowUpdated(FlowPath flowPath) {
@@ -457,6 +479,43 @@
}
/**
+ * Receive a notification that a FlowEntry is added.
+ *
+ * @param flowEntry the FlowEntry that is added.
+ */
+ @Override
+ public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntry is removed.
+ *
+ * @param flowEntry the FlowEntry that is removed.
+ */
+ @Override
+ public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_REMOVE, flowEntry);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntry is updated.
+ *
+ * @param flowEntry the FlowEntry that is updated.
+ */
+ @Override
+ public void notificationRecvFlowEntryUpdated(FlowEntry flowEntry) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.