Added the Hazelcast and notification-related code to send/receive
FlowId notifications. Those notifications will be used in the experimental
testing of sending notifications with only the added/updated/deleted FlowId,
and reading the Flow state itself from the database.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 6483121..d6c870d 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -73,6 +73,12 @@
     private MapFlowEntryListener mapFlowEntryListener = null;
     private String mapFlowEntryListenerId = null;
 
+    // State related to the Flow ID map
+    protected static final String mapFlowIdName = "mapFlowId";
+    private IMap<Long, byte[]> mapFlowId = null;
+    private MapFlowIdListener mapFlowIdListener = null;
+    private String mapFlowIdListenerId = null;
+
     // State related to the Network Topology map
     protected static final String mapTopologyName = "mapTopology";
     private IMap<String, byte[]> mapTopology = null;
@@ -230,6 +236,78 @@
     }
 
     /**
+     * Class for receiving notifications for FlowId state.
+     *
+     * The datagrid map is:
+     *  - Key : FlowId (Long)
+     *  - Value : Serialized FlowId (byte[])
+     */
+    class MapFlowIdListener implements EntryListener<Long, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryAdded(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryRemoved(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryUpdated(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryEvicted(EntryEvent<Long, byte[]> event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+
+    /**
      * Class for receiving notifications for Network Topology state.
      *
      * The datagrid map is:
@@ -504,6 +582,11 @@
 	mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
 	mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
 
+	// Initialize the FlowId-related map state
+	mapFlowIdListener = new MapFlowIdListener();
+	mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+	mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
 	// Initialize the Topology-related map state
 	mapTopologyListener = new MapTopologyListener();
 	mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -531,6 +614,11 @@
 	mapFlowEntry = null;
 	mapFlowEntryListener = null;
 
+	// Clear the FlowId-related map state
+	mapFlowId.removeEntryListener(mapFlowIdListenerId);
+	mapFlowId = null;
+	mapFlowIdListener = null;
+
 	// Clear the Topology-related map state
 	mapTopology.removeEntryListener(mapTopologyListenerId);
 	mapTopology = null;
@@ -788,6 +876,101 @@
     }
 
     /**
+     * Get all Flow IDs that are currently in the datagrid.
+     *
+     * @return all Flow IDs that are currently in the datagrid.
+     */
+    @Override
+    public Collection<FlowId> getAllFlowIds() {
+	Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
+
+	//
+	// Get all current entries
+	//
+	Collection<byte[]> values = mapFlowId.values();
+	Kryo kryo = kryoFactory.newKryo();
+	for (byte[] valueBytes : values) {
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    allFlowIds.add(flowId);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allFlowIds;
+    }
+
+    /**
+     * Send a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    @Override
+    public void notificationSendFlowIdAdded(FlowId flowId) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, flowId);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized FlowId (byte[])
+	//
+	mapFlowId.putAsync(flowId.value(), valueBytes);
+    }
+
+    /**
+     * Send a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    @Override
+    public void notificationSendFlowIdRemoved(FlowId flowId) {
+	//
+	// Remove the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized FlowId (byte[])
+	//
+	mapFlowId.removeAsync(flowId.value());
+    }
+
+    /**
+     * Send a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    @Override
+    public void notificationSendFlowIdUpdated(FlowId flowId) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	notificationSendFlowIdAdded(flowId);
+    }
+
+    /**
+     * Send a notification that all Flow IDs are removed.
+     */
+    @Override
+    public void notificationSendAllFlowIdsRemoved() {
+	//
+	// Remove all entries
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// mapFlowId.clear();
+	Set<Long> keySet = mapFlowId.keySet();
+	for (Long key : keySet) {
+	    mapFlowId.removeAsync(key);
+	}
+    }
+
+    /**
      * Get all Topology Elements that are currently in the datagrid.
      *
      * @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..d4e7b00 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -134,6 +134,39 @@
     void notificationSendAllFlowEntriesRemoved();
 
     /**
+     * Get all Flow IDs that are currently in the datagrid.
+     *
+     * @return all Flow IDs that are currently in the datagrid.
+     */
+    Collection<FlowId> getAllFlowIds();
+
+    /**
+     * Send a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    void notificationSendFlowIdAdded(FlowId flowId);
+
+    /**
+     * Send a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    void notificationSendFlowIdRemoved(FlowId flowId);
+
+    /**
+     * Send a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    void notificationSendFlowIdUpdated(FlowId flowId);
+
+    /**
+     * Send a notification that all Flow IDs are removed.
+     */
+    void notificationSendAllFlowIdsRemoved();
+
+    /**
      * Get all Topology Elements that are currently in the datagrid.
      *
      * @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 8b1f7c0..0728f9a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -63,6 +63,8 @@
 	new LinkedList<EventEntry<FlowPath>>();
     private List<EventEntry<FlowEntry>> flowEntryEvents =
 	new LinkedList<EventEntry<FlowEntry>>();
+    private List<EventEntry<FlowId>> flowIdEvents =
+	new LinkedList<EventEntry<FlowId>>();
 
     // All internally computed Flow Paths
     private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -137,6 +139,16 @@
 	    flowEntryEvents.add(eventEntry);
 	}
 
+	//
+	// Obtain the initial FlowId state
+	//
+	Collection<FlowId> flowIds = datagridService.getAllFlowIds();
+	for (FlowId flowId : flowIds) {
+	    EventEntry<FlowId> eventEntry =
+		new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	    flowIdEvents.add(eventEntry);
+	}
+
 	// Process the initial events (if any)
 	synchronized (allFlowPaths) {
 	    processEvents();
@@ -166,6 +178,7 @@
 		//  - EventEntry<TopologyElement>
 		//  - EventEntry<FlowPath>
 		//  - EventEntry<FlowEntry>
+		//  - EventEntry<FlowId>
 		//
 		for (EventEntry<?> event : collection) {
 		    // Topology event
@@ -191,6 +204,14 @@
 			flowEntryEvents.add(flowEntryEventEntry);
 			continue;
 		    }
+
+		    // FlowId event
+		    if (event.eventData() instanceof FlowId) {
+			EventEntry<FlowId> flowIdEventEntry =
+			    (EventEntry<FlowId>)event;
+			flowIdEvents.add(flowIdEventEntry);
+			continue;
+		    }
 		}
 		collection.clear();
 
@@ -215,6 +236,7 @@
 	    return;		// Nothing to do
 	}
 
+	processFlowIdEvents();
 	processFlowPathEvents();
 	processTopologyEvents();
 	processUnmatchedFlowEntryAdd();
@@ -254,6 +276,7 @@
 	topologyEvents.clear();
 	flowPathEvents.clear();
 	flowEntryEvents.clear();
+	flowIdEvents.clear();
 	//
 	shouldRecomputeFlowPaths.clear();
 	modifiedFlowPaths.clear();
@@ -346,6 +369,41 @@
     }
 
     /**
+     * Process the Flow ID events.
+     */
+    private void processFlowIdEvents() {
+	//
+	// Process all Flow ID events and update the appropriate state
+	//
+	for (EventEntry<FlowId> eventEntry : flowIdEvents) {
+	    FlowId flowId = eventEntry.eventData();
+
+	    log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
+
+	    switch (eventEntry.eventType()) {
+	    case ENTRY_ADD: {
+		//
+		// Add a new Flow ID
+		//
+		// TODO: Implement it!
+
+		break;
+	    }
+
+	    case ENTRY_REMOVE: {
+		//
+		// Remove an existing Flow ID.
+		//
+		// TODO: Implement it!
+
+		break;
+	    }
+	    }
+	}
+    }
+
+
+    /**
      * Process the Flow Path events.
      */
     private void processFlowPathEvents() {
@@ -972,6 +1030,43 @@
     }
 
     /**
+     * Receive a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    @Override
+    public void notificationRecvFlowIdAdded(FlowId flowId) {
+	EventEntry<FlowId> eventEntry =
+	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    @Override
+    public void notificationRecvFlowIdRemoved(FlowId flowId) {
+	EventEntry<FlowId> eventEntry =
+	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_REMOVE, flowId);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    @Override
+    public void notificationRecvFlowIdUpdated(FlowId flowId) {
+	// NOTE: The ADD and UPDATE events are processed in same way
+	EventEntry<FlowId> eventEntry =
+	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
      * Receive a notification that a Topology Element is added.
      *
      * @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..62edf70 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -2,6 +2,7 @@
 
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
 /**
@@ -51,6 +52,27 @@
     void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
 
     /**
+     * Receive a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    void notificationRecvFlowIdAdded(FlowId flowId);
+
+    /**
+     * Receive a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    void notificationRecvFlowIdRemoved(FlowId flowId);
+
+    /**
+     * Receive a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    void notificationRecvFlowIdUpdated(FlowId flowId);
+
+    /**
      * Receive a notification that a Topology Element is added.
      *
      * @param topologyElement the Topology Element that is added.