Added the mechanism between the FlowManager and the Hazelcast datagrid
to send/receive Flow-related state.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 7de2869..37ada35 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -4,7 +4,13 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
+
+import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo2.io.Input;
+import com.esotericsoftware.kryo2.io.Output;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -12,13 +18,21 @@
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
 
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.hazelcast.config.Config;
 import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
 import com.hazelcast.instance.GroupProperties;
 
 /**
@@ -27,13 +41,99 @@
  * appropriate in a multi-node cluster.
  */
 public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
+    private final static int MAX_BUFFER_SIZE = 64*1024;
+
     protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
     protected IFloodlightProviderService floodlightProvider;
 
     protected static final String HazelcastConfigFile = "datagridConfig";
-    private HazelcastInstance hazelcast = null;
+    private HazelcastInstance hazelcastInstance = null;
     private Config hazelcastConfig = null;
 
+    private KryoFactory kryoFactory = new KryoFactory();
+
+    // State related to the Flow map
+    protected static final String mapFlowName = "mapFlow";
+    private IFlowService flowService = null;
+    private IMap<Long, byte[]> mapFlow = null;
+    private MapFlowListener mapFlowListener = null;
+    private String mapFlowListenerId = null;
+
+    /**
+     * Class for receiving notifications for Flow state.
+     *
+     * The datagrid map is:
+     *  - Key : Flow ID (Long)
+     *  - Value : Serialized Flow (byte[])
+     */
+    class MapFlowListener implements EntryListener<Long, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryAdded(EntryEvent event) {
+	    Long keyLong = (Long)event.getKey();
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowService.notificationRecvFlowAdded(flowPath);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryRemoved(EntryEvent event) {
+	    Long keyLong = (Long)event.getKey();
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowService.notificationRecvFlowRemoved(flowPath);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryUpdated(EntryEvent event) {
+	    Long keyLong = (Long)event.getKey();
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowService.notificationRecvFlowUpdated(flowPath);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryEvicted(EntryEvent event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+
     /**
      * Initialize the Hazelcast Datagrid operation.
      *
@@ -143,6 +243,131 @@
      */
     @Override
     public void startUp(FloodlightModuleContext context) {
-	hazelcast = Hazelcast.newHazelcastInstance(hazelcastConfig);
+	hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
+    }
+
+    /**
+     * Register Flow Service for receiving Flow-related notifications.
+     *
+     * NOTE: Only a single Flow Service can be registered.
+     *
+     * @param flowService the Flow Service to register.
+     */
+    @Override
+    public void registerFlowService(IFlowService flowService) {
+	this.flowService = flowService;
+	mapFlowListener = new MapFlowListener();
+	mapFlow = hazelcastInstance.getMap(mapFlowName);
+	mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
+    }
+
+    /**
+     * De-register Flow Service for receiving Flow-related notifications.
+     *
+     * NOTE: Only a single Flow Service can be registered.
+     *
+     * @param flowService the Flow Service to de-register.
+     */
+    @Override
+    public void deregisterFlowService(IFlowService flowService) {
+	mapFlow.removeEntryListener(mapFlowListenerId);
+	mapFlow = null;
+	mapFlowListener = null;
+	this.flowService = null;
+    }
+
+    /**
+     * Get all Flows that are currently in the datagrid.
+     *
+     * @return all Flows that are currently in the datagrid.
+     */
+    @Override
+    public Collection<FlowPath> getAllFlows() {
+	Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
+
+	//
+	// Get all current entries
+	//
+	Collection<byte[]> values = mapFlow.values();
+	Kryo kryo = kryoFactory.newKryo();
+	for (byte[] valueBytes : values) {
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    allFlows.add(flowPath);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allFlows;
+    }
+
+    /**
+     * Send a notification that a Flow is added.
+     *
+     * @param flowPath the flow that is added.
+     */
+    @Override
+    public void notificationSendFlowAdded(FlowPath flowPath) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, flowPath);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry:
+	//  - Key : Flow ID (Long)
+	//  - Value : Serialized Flow (byte[])
+	//
+	mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
+    }
+
+    /**
+     * Send a notification that a Flow is removed.
+     *
+     * @param flowId the Flow ID of the flow that is removed.
+     */
+    @Override
+    public void notificationSendFlowRemoved(FlowId flowId) {
+	//
+	// Remove the entry:
+	//  - Key : Flow ID (Long)
+	//  - Value : Serialized Flow (byte[])
+	//
+	mapFlow.removeAsync(flowId.value());
+    }
+
+    /**
+     * Send a notification that a Flow is updated.
+     *
+     * @param flowPath the flow that is updated.
+     */
+    @Override
+    public void notificationSendFlowUpdated(FlowPath flowPath) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	notificationSendFlowAdded(flowPath);
+    }
+
+    /**
+     * Send a notification that all Flows are removed.
+     */
+    @Override
+    public void notificationSendAllFlowsRemoved() {
+	//
+	// Remove all entries
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// mapFlow.clear();
+	Set<Long> keySet = mapFlow.keySet();
+	for (Long key : keySet) {
+	    mapFlow.removeAsync(key);
+	}
     }
 }