Updated the FlowId Hazelcast Notification Channel so the payload
is the Source Switch DPID. Thus, the receiver of the notification
can easily check which are the Flows it should fetch from the database.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 632cc38..c195f82 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -256,7 +256,7 @@
      *
      * The datagrid map is:
      *  - Key : FlowId (Long)
-     *  - Value : Serialized FlowId (byte[])
+     *  - Value : Serialized Switch Dpid (byte[])
      */
     class MapFlowIdListener implements EntryListener<Long, byte[]> {
 	/**
@@ -265,6 +265,9 @@
 	 * @param event the notification event for the entry.
 	 */
 	public void entryAdded(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
 	    byte[] valueBytes = event.getValue();
 
 	    //
@@ -272,9 +275,9 @@
 	    //
 	    Kryo kryo = kryoFactory.newKryo();
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
 	    kryoFactory.deleteKryo(kryo);
-	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
+	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
 	}
 
 	/**
@@ -283,6 +286,9 @@
 	 * @param event the notification event for the entry.
 	 */
 	public void entryRemoved(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
 	    byte[] valueBytes = event.getValue();
 
 	    //
@@ -290,9 +296,9 @@
 	    //
 	    Kryo kryo = kryoFactory.newKryo();
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
 	    kryoFactory.deleteKryo(kryo);
-	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
+	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
 	}
 
 	/**
@@ -301,6 +307,9 @@
 	 * @param event the notification event for the entry.
 	 */
 	public void entryUpdated(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
 	    byte[] valueBytes = event.getValue();
 
 	    //
@@ -308,9 +317,9 @@
 	    //
 	    Kryo kryo = kryoFactory.newKryo();
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
 	    kryoFactory.deleteKryo(kryo);
-	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
+	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
 	}
 
 	/**
@@ -997,21 +1006,28 @@
      * @return all Flow IDs that are currently in the datagrid.
      */
     @Override
-    public Collection<FlowId> getAllFlowIds() {
-	Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
+	public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
+	Collection<Pair<FlowId, Dpid>> allFlowIds =
+	    new LinkedList<Pair<FlowId, Dpid>>();
 
 	//
 	// Get all current entries
 	//
-	Collection<byte[]> values = mapFlowId.values();
 	Kryo kryo = kryoFactory.newKryo();
-	for (byte[] valueBytes : values) {
+	for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
+	    Long key = entry.getKey();
+	    byte[] valueBytes = entry.getValue();
+
+	    FlowId flowId = new FlowId(key);
+
 	    //
 	    // Decode the value
 	    //
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
-	    allFlowIds.add(flowId);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+
+	    Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
+	    allFlowIds.add(pair);
 	}
 	kryoFactory.deleteKryo(kryo);
 
@@ -1056,23 +1072,24 @@
      * Send a notification that a FlowId is added.
      *
      * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid.
      */
     @Override
-    public void notificationSendFlowIdAdded(FlowId flowId) {
+    public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
 	//
 	// Encode the value
 	//
 	byte[] buffer = new byte[MAX_BUFFER_SIZE];
 	Kryo kryo = kryoFactory.newKryo();
 	Output output = new Output(buffer, -1);
-	kryo.writeObject(output, flowId);
+	kryo.writeObject(output, dpid);
 	byte[] valueBytes = output.toBytes();
 	kryoFactory.deleteKryo(kryo);
 
 	//
 	// Put the entry:
 	//  - Key : FlowId (Long)
-	//  - Value : Serialized FlowId (byte[])
+	//  - Value : Serialized Switch Dpid (byte[])
 	//
 	mapFlowId.putAsync(flowId.value(), valueBytes);
     }
@@ -1087,7 +1104,7 @@
 	//
 	// Remove the entry:
 	//  - Key : FlowId (Long)
-	//  - Value : Serialized FlowId (byte[])
+	//  - Value : Serialized Switch Dpid (byte[])
 	//
 	mapFlowId.removeAsync(flowId.value());
     }
@@ -1096,11 +1113,12 @@
      * Send a notification that a FlowId is updated.
      *
      * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid.
      */
     @Override
-    public void notificationSendFlowIdUpdated(FlowId flowId) {
+    public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
 	// NOTE: Adding an entry with an existing key automatically updates it
-	notificationSendFlowIdAdded(flowId);
+	notificationSendFlowIdAdded(flowId, dpid);
     }
 
     /**
@@ -1157,7 +1175,7 @@
 	//
 	// Remove the entry:
 	//  - Key : FlowEntryId (Long)
-	//  - Value : Serialized Dpid (byte[])
+	//  - Value : Serialized Switch Dpid (byte[])
 	//
 	mapFlowEntryId.removeAsync(flowEntryId.value());
     }