Added the mechanism between the FlowManager and the Hazelcast datagrid
to send/receive Flow-related state.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 7de2869..37ada35 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -4,7 +4,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Set;
+
+import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo2.io.Input;
+import com.esotericsoftware.kryo2.io.Output;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -12,13 +18,21 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
import com.hazelcast.instance.GroupProperties;
/**
@@ -27,13 +41,99 @@
* appropriate in a multi-node cluster.
*/
public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
+ private final static int MAX_BUFFER_SIZE = 64*1024;
+
protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
protected IFloodlightProviderService floodlightProvider;
protected static final String HazelcastConfigFile = "datagridConfig";
- private HazelcastInstance hazelcast = null;
+ private HazelcastInstance hazelcastInstance = null;
private Config hazelcastConfig = null;
+ private KryoFactory kryoFactory = new KryoFactory();
+
+ // State related to the Flow map
+ protected static final String mapFlowName = "mapFlow";
+ private IFlowService flowService = null;
+ private IMap<Long, byte[]> mapFlow = null;
+ private MapFlowListener mapFlowListener = null;
+ private String mapFlowListenerId = null;
+
+ /**
+ * Class for receiving notifications for Flow state.
+ *
+ * The datagrid map is:
+ * - Key : Flow ID (Long)
+ * - Value : Serialized Flow (byte[])
+ */
+ class MapFlowListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvFlowAdded(flowPath);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvFlowRemoved(flowPath);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvFlowUpdated(flowPath);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
/**
* Initialize the Hazelcast Datagrid operation.
*
@@ -143,6 +243,131 @@
*/
@Override
public void startUp(FloodlightModuleContext context) {
- hazelcast = Hazelcast.newHazelcastInstance(hazelcastConfig);
+ hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
+ }
+
+ /**
+ * Register Flow Service for receiving Flow-related notifications.
+ *
+ * NOTE: Only a single Flow Service can be registered.
+ *
+ * @param flowService the Flow Service to register.
+ */
+ @Override
+ public void registerFlowService(IFlowService flowService) {
+ this.flowService = flowService;
+ mapFlowListener = new MapFlowListener();
+ mapFlow = hazelcastInstance.getMap(mapFlowName);
+ mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
+ }
+
+ /**
+ * De-register Flow Service for receiving Flow-related notifications.
+ *
+ * NOTE: Only a single Flow Service can be registered.
+ *
+ * @param flowService the Flow Service to de-register.
+ */
+ @Override
+ public void deregisterFlowService(IFlowService flowService) {
+ mapFlow.removeEntryListener(mapFlowListenerId);
+ mapFlow = null;
+ mapFlowListener = null;
+ this.flowService = null;
+ }
+
+ /**
+ * Get all Flows that are currently in the datagrid.
+ *
+ * @return all Flows that are currently in the datagrid.
+ */
+ @Override
+ public Collection<FlowPath> getAllFlows() {
+ Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
+
+ //
+ // Get all current entries
+ //
+ Collection<byte[]> values = mapFlow.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ allFlows.add(flowPath);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlows;
+ }
+
+ /**
+ * Send a notification that a Flow is added.
+ *
+ * @param flowPath the flow that is added.
+ */
+ @Override
+ public void notificationSendFlowAdded(FlowPath flowPath) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, flowPath);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : Flow ID (Long)
+ // - Value : Serialized Flow (byte[])
+ //
+ mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a Flow is removed.
+ *
+ * @param flowId the Flow ID of the flow that is removed.
+ */
+ @Override
+ public void notificationSendFlowRemoved(FlowId flowId) {
+ //
+ // Remove the entry:
+ // - Key : Flow ID (Long)
+ // - Value : Serialized Flow (byte[])
+ //
+ mapFlow.removeAsync(flowId.value());
+ }
+
+ /**
+ * Send a notification that a Flow is updated.
+ *
+ * @param flowPath the flow that is updated.
+ */
+ @Override
+ public void notificationSendFlowUpdated(FlowPath flowPath) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowAdded(flowPath);
+ }
+
+ /**
+ * Send a notification that all Flows are removed.
+ */
+ @Override
+ public void notificationSendAllFlowsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlow.clear();
+ Set<Long> keySet = mapFlow.keySet();
+ for (Long key : keySet) {
+ mapFlow.removeAsync(key);
+ }
}
}
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 1c7f3ab..498f58b 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -1,10 +1,65 @@
package net.onrc.onos.datagrid;
+import java.util.Collection;
+
import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
/**
* Interface for providing Datagrid Service to other modules.
*/
public interface IDatagridService extends IFloodlightService {
- // TODO
+ /**
+ * Register Flow Service for receiving Flow-related notifications.
+ *
+ * NOTE: Only a single Flow Service can be registered.
+ *
+ * @param flowService the Flow Service to register.
+ */
+ void registerFlowService(IFlowService flowService);
+
+ /**
+ * De-register Flow Service for receiving Flow-related notifications.
+ *
+ * NOTE: Only a single Flow Service can be registered.
+ *
+ * @param flowService the Flow Service to de-register.
+ */
+ void deregisterFlowService(IFlowService flowService);
+
+ /**
+ * Get all Flows that are currently in the datagrid.
+ *
+ * @return all Flows that are currently in the datagrid.
+ */
+ Collection<FlowPath> getAllFlows();
+
+ /**
+ * Send a notification that a Flow is added.
+ *
+ * @param flowPath the flow that is added.
+ */
+ void notificationSendFlowAdded(FlowPath flowPath);
+
+ /**
+ * Send a notification that a Flow is removed.
+ *
+ * @param flowId the Flow ID of the flow that is removed.
+ */
+ void notificationSendFlowRemoved(FlowId flowId);
+
+ /**
+ * Send a notification that a Flow is updated.
+ *
+ * @param flowPath the flow that is updated.
+ */
+ void notificationSendFlowUpdated(FlowPath flowPath);
+
+ /**
+ * Send a notification that all Flows are removed.
+ */
+ void notificationSendAllFlowsRemoved();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index cad5d9c..5a4edc3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -379,6 +379,7 @@
*/
@Override
public void close() {
+ datagridService.deregisterFlowService(this);
dbHandler.close();
}
@@ -487,6 +488,10 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
+ datagridService.registerFlowService(this);
+ // TODO: Flow Paths not used yet
+ Collection<FlowPath> flowPaths = datagridService.getAllFlows();
+
mapReaderScheduler.scheduleAtFixedRate(
mapReader, 3, 3, TimeUnit.SECONDS);
shortestPathReconcileScheduler.scheduleAtFixedRate(
@@ -505,8 +510,12 @@
@Override
public boolean addFlow(FlowPath flowPath, FlowId flowId,
String dataPathSummaryStr) {
- return FlowDatabaseOperation.addFlow(this, dbHandler, flowPath, flowId,
- dataPathSummaryStr);
+ if (FlowDatabaseOperation.addFlow(this, dbHandler, flowPath, flowId,
+ dataPathSummaryStr)) {
+ datagridService.notificationSendFlowAdded(flowPath);
+ return true;
+ }
+ return false;
}
/**
@@ -528,7 +537,11 @@
*/
@Override
public boolean deleteAllFlows() {
- return FlowDatabaseOperation.deleteAllFlows(dbHandler);
+ if (FlowDatabaseOperation.deleteAllFlows(dbHandler)) {
+ datagridService.notificationSendAllFlowsRemoved();
+ return true;
+ }
+ return false;
}
/**
@@ -539,7 +552,11 @@
*/
@Override
public boolean deleteFlow(FlowId flowId) {
- return FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+ if (FlowDatabaseOperation.deleteFlow(dbHandler, flowId)) {
+ datagridService.notificationSendFlowRemoved(flowId);
+ return true;
+ }
+ return false;
}
/**
@@ -549,7 +566,11 @@
*/
@Override
public boolean clearAllFlows() {
- return FlowDatabaseOperation.clearAllFlows(dbHandler);
+ if (FlowDatabaseOperation.clearAllFlows(dbHandler)) {
+ datagridService.notificationSendAllFlowsRemoved();
+ return true;
+ }
+ return false;
}
/**
@@ -560,7 +581,11 @@
*/
@Override
public boolean clearFlow(FlowId flowId) {
- return FlowDatabaseOperation.clearFlow(dbHandler, flowId);
+ if (FlowDatabaseOperation.clearFlow(dbHandler, flowId)) {
+ datagridService.notificationSendFlowRemoved(flowId);
+ return true;
+ }
+ return false;
}
/**
@@ -682,7 +707,7 @@
* @param newDataPath the new data path to use.
* @return true on success, otherwise false.
*/
- public boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
+ private boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
//
// Set the incoming port matching and the outgoing port output
@@ -745,7 +770,7 @@
*
* @param flowObjSet the set of flows that need to be reconciliated.
*/
- public void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
+ private void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
if (! flowObjSet.iterator().hasNext())
return;
// TODO: Not implemented/used yet.
@@ -759,7 +784,7 @@
* @param flowEntryObj the flow entry object to install.
* @return true on success, otherwise false.
*/
- public boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
+ private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
return FlowSwitchOperation.installFlowEntry(
floodlightProvider.getOFMessageFactory(),
@@ -774,7 +799,7 @@
* @param flowEntry the flow entry to install.
* @return true on success, otherwise false.
*/
- public boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
+ private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
return FlowSwitchOperation.installFlowEntry(
floodlightProvider.getOFMessageFactory(),
@@ -789,7 +814,7 @@
* @param flowEntry the flow entry to remove.
* @return true on success, otherwise false.
*/
- public boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
+ private boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
//
// The installFlowEntry() method implements both installation
@@ -797,4 +822,34 @@
//
return (installFlowEntry(mySwitch, flowPath, flowEntry));
}
+
+ /**
+ * Receive a notification that a Flow is added.
+ *
+ * @param flowPath the flow that is added.
+ */
+ @Override
+ public void notificationRecvFlowAdded(FlowPath flowPath) {
+ // TODO
+ }
+
+ /**
+ * Receive a notification that a Flow is removed.
+ *
+ * @param flowPath the flow that is removed.
+ */
+ @Override
+ public void notificationRecvFlowRemoved(FlowPath flowPath) {
+ // TODO
+ }
+
+ /**
+ * Receive a notification that a Flow is updated.
+ *
+ * @param flowPath the flow that is updated.
+ */
+ @Override
+ public void notificationRecvFlowUpdated(FlowPath flowPath) {
+ // TODO
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index 73f86b6..8c908cf 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -114,5 +114,26 @@
* conditions to install.
* @return the added shortest-path flow on success, otherwise null.
*/
- public FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
+ FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Flow is added.
+ *
+ * @param flowPath the flow that is added.
+ */
+ void notificationRecvFlowAdded(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Flow is removed.
+ *
+ * @param flowPath the flow that is removed.
+ */
+ void notificationRecvFlowRemoved(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Flow is updated.
+ *
+ * @param flowPath the flow that is updated.
+ */
+ void notificationRecvFlowUpdated(FlowPath flowPath);
}
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
index 8a8779b..ea09c17 100644
--- a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
@@ -5,6 +5,7 @@
import static org.easymock.EasyMock.cmpEq;
import static org.powermock.api.easymock.PowerMock.*;
+import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -904,7 +905,15 @@
replayAll();
fm.init(context);
- Boolean result = fm.reconcileFlow(iFlowPath1, dataPath);
+ // Use reflection to test the private method
+ // Boolean result = fm.reconcileFlow(iFlowPath1, dataPath);
+ Class fmClass = FlowManager.class;
+ Method method = fmClass.getDeclaredMethod(
+ "reconcileFlow",
+ new Class[] { IFlowPath.class, DataPath.class });
+ method.setAccessible(true);
+ Boolean result = (Boolean)method.invoke(fm,
+ new Object[] { iFlowPath1, dataPath });
// verify the test
verifyAll();
@@ -958,7 +967,16 @@
replayAll();
fm.init(context);
- Boolean result = fm.installFlowEntry(iofSwitch, iFlowPath, iFlowEntry);
+ // Use reflection to test the private method
+ // Boolean result = fm.installFlowEntry(iofSwitch, iFlowPath, iFlowEntry);
+ Class fmClass = FlowManager.class;
+ Method method = fmClass.getDeclaredMethod(
+ "installFlowEntry",
+ new Class[] { IOFSwitch.class, IFlowPath.class, IFlowEntry.class });
+ method.setAccessible(true);
+ Boolean result = (Boolean)method.invoke(fm,
+ new Object[] { iofSwitch, iFlowPath, iFlowEntry });
+
// verify the test
verifyAll();