Added the mechanism between the FlowManager and the Hazelcast datagrid
to send/receive Flow-related state.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 7de2869..37ada35 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -4,7 +4,13 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
+
+import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo2.io.Input;
+import com.esotericsoftware.kryo2.io.Output;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -12,13 +18,21 @@
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
 
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.hazelcast.config.Config;
 import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
 import com.hazelcast.instance.GroupProperties;
 
 /**
@@ -27,13 +41,99 @@
  * appropriate in a multi-node cluster.
  */
 public class HazelcastDatagrid implements IFloodlightModule, IDatagridService {
+    private final static int MAX_BUFFER_SIZE = 64*1024;
+
     protected final static Logger log = LoggerFactory.getLogger(HazelcastDatagrid.class);
     protected IFloodlightProviderService floodlightProvider;
 
     protected static final String HazelcastConfigFile = "datagridConfig";
-    private HazelcastInstance hazelcast = null;
+    private HazelcastInstance hazelcastInstance = null;
     private Config hazelcastConfig = null;
 
+    private KryoFactory kryoFactory = new KryoFactory();
+
+    // State related to the Flow map
+    protected static final String mapFlowName = "mapFlow";
+    private IFlowService flowService = null;
+    private IMap<Long, byte[]> mapFlow = null;
+    private MapFlowListener mapFlowListener = null;
+    private String mapFlowListenerId = null;
+
+    /**
+     * Class for receiving notifications for Flow state.
+     *
+     * The datagrid map is:
+     *  - Key : Flow ID (Long)
+     *  - Value : Serialized Flow (byte[])
+     */
+    class MapFlowListener implements EntryListener<Long, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryAdded(EntryEvent event) {
+	    Long keyLong = (Long)event.getKey();
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowService.notificationRecvFlowAdded(flowPath);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryRemoved(EntryEvent event) {
+	    Long keyLong = (Long)event.getKey();
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowService.notificationRecvFlowRemoved(flowPath);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryUpdated(EntryEvent event) {
+	    Long keyLong = (Long)event.getKey();
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowService.notificationRecvFlowUpdated(flowPath);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryEvicted(EntryEvent event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+
     /**
      * Initialize the Hazelcast Datagrid operation.
      *
@@ -143,6 +243,131 @@
      */
     @Override
     public void startUp(FloodlightModuleContext context) {
-	hazelcast = Hazelcast.newHazelcastInstance(hazelcastConfig);
+	hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
+    }
+
+    /**
+     * Register Flow Service for receiving Flow-related notifications.
+     *
+     * NOTE: Only a single Flow Service can be registered.
+     *
+     * @param flowService the Flow Service to register.
+     */
+    @Override
+    public void registerFlowService(IFlowService flowService) {
+	this.flowService = flowService;
+	mapFlowListener = new MapFlowListener();
+	mapFlow = hazelcastInstance.getMap(mapFlowName);
+	mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
+    }
+
+    /**
+     * De-register Flow Service for receiving Flow-related notifications.
+     *
+     * NOTE: Only a single Flow Service can be registered.
+     *
+     * @param flowService the Flow Service to de-register.
+     */
+    @Override
+    public void deregisterFlowService(IFlowService flowService) {
+	mapFlow.removeEntryListener(mapFlowListenerId);
+	mapFlow = null;
+	mapFlowListener = null;
+	this.flowService = null;
+    }
+
+    /**
+     * Get all Flows that are currently in the datagrid.
+     *
+     * @return all Flows that are currently in the datagrid.
+     */
+    @Override
+    public Collection<FlowPath> getAllFlows() {
+	Collection<FlowPath> allFlows = new LinkedList<FlowPath>();
+
+	//
+	// Get all current entries
+	//
+	Collection<byte[]> values = mapFlow.values();
+	Kryo kryo = kryoFactory.newKryo();
+	for (byte[] valueBytes : values) {
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+	    allFlows.add(flowPath);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allFlows;
+    }
+
+    /**
+     * Send a notification that a Flow is added.
+     *
+     * @param flowPath the flow that is added.
+     */
+    @Override
+    public void notificationSendFlowAdded(FlowPath flowPath) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, flowPath);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry:
+	//  - Key : Flow ID (Long)
+	//  - Value : Serialized Flow (byte[])
+	//
+	mapFlow.putAsync(flowPath.flowId().value(), valueBytes);
+    }
+
+    /**
+     * Send a notification that a Flow is removed.
+     *
+     * @param flowId the Flow ID of the flow that is removed.
+     */
+    @Override
+    public void notificationSendFlowRemoved(FlowId flowId) {
+	//
+	// Remove the entry:
+	//  - Key : Flow ID (Long)
+	//  - Value : Serialized Flow (byte[])
+	//
+	mapFlow.removeAsync(flowId.value());
+    }
+
+    /**
+     * Send a notification that a Flow is updated.
+     *
+     * @param flowPath the flow that is updated.
+     */
+    @Override
+    public void notificationSendFlowUpdated(FlowPath flowPath) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	notificationSendFlowAdded(flowPath);
+    }
+
+    /**
+     * Send a notification that all Flows are removed.
+     */
+    @Override
+    public void notificationSendAllFlowsRemoved() {
+	//
+	// Remove all entries
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// mapFlow.clear();
+	Set<Long> keySet = mapFlow.keySet();
+	for (Long key : keySet) {
+	    mapFlow.removeAsync(key);
+	}
     }
 }
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 1c7f3ab..498f58b 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -1,10 +1,65 @@
 package net.onrc.onos.datagrid;
 
+import java.util.Collection;
+
 import net.floodlightcontroller.core.module.IFloodlightService;
 
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
 /**
  * Interface for providing Datagrid Service to other modules.
  */
 public interface IDatagridService extends IFloodlightService {
-    // TODO
+    /**
+     * Register Flow Service for receiving Flow-related notifications.
+     *
+     * NOTE: Only a single Flow Service can be registered.
+     *
+     * @param flowService the Flow Service to register.
+     */
+    void registerFlowService(IFlowService flowService);
+
+    /**
+     * De-register Flow Service for receiving Flow-related notifications.
+     *
+     * NOTE: Only a single Flow Service can be registered.
+     *
+     * @param flowService the Flow Service to de-register.
+     */
+    void deregisterFlowService(IFlowService flowService);
+
+    /**
+     * Get all Flows that are currently in the datagrid.
+     *
+     * @return all Flows that are currently in the datagrid.
+     */
+    Collection<FlowPath> getAllFlows();
+
+    /**
+     * Send a notification that a Flow is added.
+     *
+     * @param flowPath the flow that is added.
+     */
+    void notificationSendFlowAdded(FlowPath flowPath);
+
+    /**
+     * Send a notification that a Flow is removed.
+     *
+     * @param flowId the Flow ID of the flow that is removed.
+     */
+    void notificationSendFlowRemoved(FlowId flowId);
+
+    /**
+     * Send a notification that a Flow is updated.
+     *
+     * @param flowPath the flow that is updated.
+     */
+    void notificationSendFlowUpdated(FlowPath flowPath);
+
+    /**
+     * Send a notification that all Flows are removed.
+     */
+    void notificationSendAllFlowsRemoved();
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index cad5d9c..5a4edc3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -379,6 +379,7 @@
      */
     @Override
     public void close() {
+	datagridService.deregisterFlowService(this);
     	dbHandler.close();
     }
 
@@ -487,6 +488,10 @@
 	// Initialize the Flow Entry ID generator
 	nextFlowEntryIdPrefix = randomGenerator.nextInt();
 
+	datagridService.registerFlowService(this);
+	// TODO: Flow Paths not used yet
+	Collection<FlowPath> flowPaths = datagridService.getAllFlows();
+
 	mapReaderScheduler.scheduleAtFixedRate(
 			mapReader, 3, 3, TimeUnit.SECONDS);
 	shortestPathReconcileScheduler.scheduleAtFixedRate(
@@ -505,8 +510,12 @@
     @Override
     public boolean addFlow(FlowPath flowPath, FlowId flowId,
 			   String dataPathSummaryStr) {
-	return FlowDatabaseOperation.addFlow(this, dbHandler, flowPath, flowId,
-					     dataPathSummaryStr);
+	if (FlowDatabaseOperation.addFlow(this, dbHandler, flowPath, flowId,
+					  dataPathSummaryStr)) {
+	    datagridService.notificationSendFlowAdded(flowPath);
+	    return true;
+	}
+	return false;
     }
 
     /**
@@ -528,7 +537,11 @@
      */
     @Override
     public boolean deleteAllFlows() {
-	return FlowDatabaseOperation.deleteAllFlows(dbHandler);
+	if (FlowDatabaseOperation.deleteAllFlows(dbHandler)) {
+	    datagridService.notificationSendAllFlowsRemoved();
+	    return true;
+	}
+	return false;
     }
 
     /**
@@ -539,7 +552,11 @@
      */
     @Override
     public boolean deleteFlow(FlowId flowId) {
-	return FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+	if (FlowDatabaseOperation.deleteFlow(dbHandler, flowId)) {
+	    datagridService.notificationSendFlowRemoved(flowId);
+	    return true;
+	}
+	return false;
     }
 
     /**
@@ -549,7 +566,11 @@
      */
     @Override
     public boolean clearAllFlows() {
-	return FlowDatabaseOperation.clearAllFlows(dbHandler);
+	if (FlowDatabaseOperation.clearAllFlows(dbHandler)) {
+	    datagridService.notificationSendAllFlowsRemoved();
+	    return true;
+	}
+	return false;
     }
 
     /**
@@ -560,7 +581,11 @@
      */
     @Override
     public boolean clearFlow(FlowId flowId) {
-	return FlowDatabaseOperation.clearFlow(dbHandler, flowId);
+	if (FlowDatabaseOperation.clearFlow(dbHandler, flowId)) {
+	    datagridService.notificationSendFlowRemoved(flowId);
+	    return true;
+	}
+	return false;
     }
 
     /**
@@ -682,7 +707,7 @@
      * @param newDataPath the new data path to use.
      * @return true on success, otherwise false.
      */
-    public boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
+    private boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
 
 	//
 	// Set the incoming port matching and the outgoing port output
@@ -745,7 +770,7 @@
      *
      * @param flowObjSet the set of flows that need to be reconciliated.
      */
-    public void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
+    private void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
 	if (! flowObjSet.iterator().hasNext())
 	    return;
 	// TODO: Not implemented/used yet.
@@ -759,7 +784,7 @@
      * @param flowEntryObj the flow entry object to install.
      * @return true on success, otherwise false.
      */
-    public boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
+    private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
 				    IFlowEntry flowEntryObj) {
 	return FlowSwitchOperation.installFlowEntry(
 		floodlightProvider.getOFMessageFactory(),
@@ -774,7 +799,7 @@
      * @param flowEntry the flow entry to install.
      * @return true on success, otherwise false.
      */
-    public boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
+    private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
 				    FlowEntry flowEntry) {
 	return FlowSwitchOperation.installFlowEntry(
 		floodlightProvider.getOFMessageFactory(),
@@ -789,7 +814,7 @@
      * @param flowEntry the flow entry to remove.
      * @return true on success, otherwise false.
      */
-    public boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
+    private boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
 				   FlowEntry flowEntry) {
 	//
 	// The installFlowEntry() method implements both installation
@@ -797,4 +822,34 @@
 	//
 	return (installFlowEntry(mySwitch, flowPath, flowEntry));
     }
+
+    /**
+     * Receive a notification that a Flow is added.
+     *
+     * @param flowPath the flow that is added.
+     */
+    @Override
+    public void notificationRecvFlowAdded(FlowPath flowPath) {
+	// TODO
+    }
+
+    /**
+     * Receive a notification that a Flow is removed.
+     *
+     * @param flowPath the flow that is removed.
+     */
+    @Override
+    public void notificationRecvFlowRemoved(FlowPath flowPath) {
+	// TODO
+    }
+
+    /**
+     * Receive a notification that a Flow is updated.
+     *
+     * @param flowPath the flow that is updated.
+     */
+    @Override
+    public void notificationRecvFlowUpdated(FlowPath flowPath) {
+	// TODO
+    }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index 73f86b6..8c908cf 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -114,5 +114,26 @@
      * conditions to install.
      * @return the added shortest-path flow on success, otherwise null.
      */
-    public FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
+    FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
+
+    /**
+     * Receive a notification that a Flow is added.
+     *
+     * @param flowPath the flow that is added.
+     */
+    void notificationRecvFlowAdded(FlowPath flowPath);
+
+    /**
+     * Receive a notification that a Flow is removed.
+     *
+     * @param flowPath the flow that is removed.
+     */
+    void notificationRecvFlowRemoved(FlowPath flowPath);
+
+    /**
+     * Receive a notification that a Flow is updated.
+     *
+     * @param flowPath the flow that is updated.
+     */
+    void notificationRecvFlowUpdated(FlowPath flowPath);
 }
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
index 8a8779b..ea09c17 100644
--- a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
@@ -5,6 +5,7 @@
 import static org.easymock.EasyMock.cmpEq;
 import static org.powermock.api.easymock.PowerMock.*;
 
+import java.lang.reflect.Method;
 import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -904,7 +905,15 @@
 		replayAll();
 		
 		fm.init(context);
-		Boolean result = fm.reconcileFlow(iFlowPath1, dataPath);
+		// Use reflection to test the private method
+		// Boolean result = fm.reconcileFlow(iFlowPath1, dataPath);
+		Class fmClass = FlowManager.class;
+		Method method = fmClass.getDeclaredMethod(
+			"reconcileFlow",
+			new Class[] { IFlowPath.class, DataPath.class });
+		method.setAccessible(true);
+		Boolean result = (Boolean)method.invoke(fm,
+			new Object[] { iFlowPath1, dataPath });
 		
 		// verify the test
 		verifyAll();
@@ -958,7 +967,16 @@
 		replayAll();
 		
 		fm.init(context);
-		Boolean result = fm.installFlowEntry(iofSwitch, iFlowPath, iFlowEntry);
+		// Use reflection to test the private method
+		// Boolean result = fm.installFlowEntry(iofSwitch, iFlowPath, iFlowEntry);
+		Class fmClass = FlowManager.class;
+		Method method = fmClass.getDeclaredMethod(
+			"installFlowEntry",
+			new Class[] { IOFSwitch.class, IFlowPath.class, IFlowEntry.class });
+		method.setAccessible(true);
+		Boolean result = (Boolean)method.invoke(fm,
+			new Object[] { iofSwitch, iFlowPath, iFlowEntry });
+
 		
 		// verify the test
 		verifyAll();