Added the Hazelcast and notification-related code to send/receive
FlowId notifications. Those notifications will be used in the experimental
testing of sending notifications with only the added/updated/deleted FlowId,
and reading the Flow state itself from the database.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 6483121..d6c870d 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -73,6 +73,12 @@
     private MapFlowEntryListener mapFlowEntryListener = null;
     private String mapFlowEntryListenerId = null;
 
+    // State related to the Flow ID map
+    protected static final String mapFlowIdName = "mapFlowId";
+    private IMap<Long, byte[]> mapFlowId = null;
+    private MapFlowIdListener mapFlowIdListener = null;
+    private String mapFlowIdListenerId = null;
+
     // State related to the Network Topology map
     protected static final String mapTopologyName = "mapTopology";
     private IMap<String, byte[]> mapTopology = null;
@@ -230,6 +236,78 @@
     }
 
     /**
+     * Class for receiving notifications for FlowId state.
+     *
+     * The datagrid map is:
+     *  - Key : FlowId (Long)
+     *  - Value : Serialized FlowId (byte[])
+     */
+    class MapFlowIdListener implements EntryListener<Long, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryAdded(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryRemoved(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryUpdated(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryEvicted(EntryEvent<Long, byte[]> event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+
+    /**
      * Class for receiving notifications for Network Topology state.
      *
      * The datagrid map is:
@@ -504,6 +582,11 @@
 	mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
 	mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
 
+	// Initialize the FlowId-related map state
+	mapFlowIdListener = new MapFlowIdListener();
+	mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+	mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
 	// Initialize the Topology-related map state
 	mapTopologyListener = new MapTopologyListener();
 	mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -531,6 +614,11 @@
 	mapFlowEntry = null;
 	mapFlowEntryListener = null;
 
+	// Clear the FlowId-related map state
+	mapFlowId.removeEntryListener(mapFlowIdListenerId);
+	mapFlowId = null;
+	mapFlowIdListener = null;
+
 	// Clear the Topology-related map state
 	mapTopology.removeEntryListener(mapTopologyListenerId);
 	mapTopology = null;
@@ -788,6 +876,101 @@
     }
 
     /**
+     * Get all Flow IDs that are currently in the datagrid.
+     *
+     * @return all Flow IDs that are currently in the datagrid.
+     */
+    @Override
+    public Collection<FlowId> getAllFlowIds() {
+	Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
+
+	//
+	// Get all current entries
+	//
+	Collection<byte[]> values = mapFlowId.values();
+	Kryo kryo = kryoFactory.newKryo();
+	for (byte[] valueBytes : values) {
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    allFlowIds.add(flowId);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allFlowIds;
+    }
+
+    /**
+     * Send a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    @Override
+    public void notificationSendFlowIdAdded(FlowId flowId) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, flowId);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized FlowId (byte[])
+	//
+	mapFlowId.putAsync(flowId.value(), valueBytes);
+    }
+
+    /**
+     * Send a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    @Override
+    public void notificationSendFlowIdRemoved(FlowId flowId) {
+	//
+	// Remove the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized FlowId (byte[])
+	//
+	mapFlowId.removeAsync(flowId.value());
+    }
+
+    /**
+     * Send a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    @Override
+    public void notificationSendFlowIdUpdated(FlowId flowId) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	notificationSendFlowIdAdded(flowId);
+    }
+
+    /**
+     * Send a notification that all Flow IDs are removed.
+     */
+    @Override
+    public void notificationSendAllFlowIdsRemoved() {
+	//
+	// Remove all entries
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// mapFlowId.clear();
+	Set<Long> keySet = mapFlowId.keySet();
+	for (Long key : keySet) {
+	    mapFlowId.removeAsync(key);
+	}
+    }
+
+    /**
      * Get all Topology Elements that are currently in the datagrid.
      *
      * @return all Topology Elements that are currently in the datagrid.