Added the mechanism between the FlowManager and the Hazelcast datagrid
to send/receive Flow-related state.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 7de2869..37ada35 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -4,7 +4,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Set;
+
+import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo2.io.Input;
+import com.esotericsoftware.kryo2.io.Output;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -12,13 +18,21 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
import com.hazelcast.instance.GroupProperties;
/**
@@ -27,13 +41,99 @@
* appropriate in a multi-node cluster.
*/
public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
+ private final static int MAX_BUFFER_SIZE = 64*1024;
+
protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
protected IFloodlightProviderService floodlightProvider;
protected static final String HazelcastConfigFile = "datagridConfig";
- private HazelcastInstance hazelcast = null;
+ private HazelcastInstance hazelcastInstance = null;
private Config hazelcastConfig = null;
+ private KryoFactory kryoFactory = new KryoFactory();
+
+ // State related to the Flow map
+ protected static final String mapFlowName = "mapFlow";
+ private IFlowService flowService = null;
+ private IMap<Long, byte[]> mapFlow = null;
+ private MapFlowListener mapFlowListener = null;
+ private String mapFlowListenerId = null;
+
+ /**
+ * Class for receiving notifications for Flow state.
+ *
+ * The datagrid map is:
+ * - Key : Flow ID (Long)
+ * - Value : Serialized Flow (byte[])
+ */
+ class MapFlowListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvFlowAdded(flowPath);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvFlowRemoved(flowPath);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent event) {
+ Long keyLong = (Long)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvFlowUpdated(flowPath);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
/**
* Initialize the Hazelcast Datagrid operation.
*
@@ -143,6 +243,131 @@
*/
@Override
public void startUp(FloodlightModuleContext context) {
- hazelcast = Hazelcast.newHazelcastInstance(hazelcastConfig);
+ hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
+ }
+
+ /**
+ * Register Flow Service for receiving Flow-related notifications.
+ *
+ * NOTE: Only a single Flow Service can be registered.
+ *
+ * @param flowService the Flow Service to register.
+ */
+ @Override
+ public void registerFlowService(IFlowService flowService) {
+ this.flowService = flowService;
+ mapFlowListener = new MapFlowListener();
+ mapFlow = hazelcastInstance.getMap(mapFlowName);
+ mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
+ }
+
+ /**
+ * De-register Flow Service for receiving Flow-related notifications.
+ *
+ * NOTE: Only a single Flow Service can be registered.
+ *
+ * @param flowService the Flow Service to de-register.
+ */
+ @Override
+ public void deregisterFlowService(IFlowService flowService) {
+ mapFlow.removeEntryListener(mapFlowListenerId);
+ mapFlow = null;
+ mapFlowListener = null;
+ this.flowService = null;
+ }
+
+ /**
+ * Get all Flows that are currently in the datagrid.
+ *
+ * @return all Flows that are currently in the datagrid.
+ */
+ @Override
+ public Collection<FlowPath> getAllFlows() {
+ Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
+
+ //
+ // Get all current entries
+ //
+ Collection<byte[]> values = mapFlow.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ allFlows.add(flowPath);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlows;
+ }
+
+ /**
+ * Send a notification that a Flow is added.
+ *
+ * @param flowPath the flow that is added.
+ */
+ @Override
+ public void notificationSendFlowAdded(FlowPath flowPath) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, flowPath);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : Flow ID (Long)
+ // - Value : Serialized Flow (byte[])
+ //
+ mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a Flow is removed.
+ *
+ * @param flowId the Flow ID of the flow that is removed.
+ */
+ @Override
+ public void notificationSendFlowRemoved(FlowId flowId) {
+ //
+ // Remove the entry:
+ // - Key : Flow ID (Long)
+ // - Value : Serialized Flow (byte[])
+ //
+ mapFlow.removeAsync(flowId.value());
+ }
+
+ /**
+ * Send a notification that a Flow is updated.
+ *
+ * @param flowPath the flow that is updated.
+ */
+ @Override
+ public void notificationSendFlowUpdated(FlowPath flowPath) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowAdded(flowPath);
+ }
+
+ /**
+ * Send a notification that all Flows are removed.
+ */
+ @Override
+ public void notificationSendAllFlowsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlow.clear();
+ Set<Long> keySet = mapFlow.keySet();
+ for (Long key : keySet) {
+ mapFlow.removeAsync(key);
+ }
}
}