Updated the FlowId Hazelcast Notification Channel so the payload
is the Source Switch DPID. Thus, the receiver of the notification
can easily check which are the Flows it should fetch from the database.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 632cc38..c195f82 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -256,7 +256,7 @@
      *
      * The datagrid map is:
      *  - Key : FlowId (Long)
-     *  - Value : Serialized FlowId (byte[])
+     *  - Value : Serialized Switch Dpid (byte[])
      */
     class MapFlowIdListener implements EntryListener<Long, byte[]> {
 	/**
@@ -265,6 +265,9 @@
 	 * @param event the notification event for the entry.
 	 */
 	public void entryAdded(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
 	    byte[] valueBytes = event.getValue();
 
 	    //
@@ -272,9 +275,9 @@
 	    //
 	    Kryo kryo = kryoFactory.newKryo();
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
 	    kryoFactory.deleteKryo(kryo);
-	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
+	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
 	}
 
 	/**
@@ -283,6 +286,9 @@
 	 * @param event the notification event for the entry.
 	 */
 	public void entryRemoved(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
 	    byte[] valueBytes = event.getValue();
 
 	    //
@@ -290,9 +296,9 @@
 	    //
 	    Kryo kryo = kryoFactory.newKryo();
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
 	    kryoFactory.deleteKryo(kryo);
-	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
+	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
 	}
 
 	/**
@@ -301,6 +307,9 @@
 	 * @param event the notification event for the entry.
 	 */
 	public void entryUpdated(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
 	    byte[] valueBytes = event.getValue();
 
 	    //
@@ -308,9 +317,9 @@
 	    //
 	    Kryo kryo = kryoFactory.newKryo();
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
 	    kryoFactory.deleteKryo(kryo);
-	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
+	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
 	}
 
 	/**
@@ -997,21 +1006,28 @@
      * @return all Flow IDs that are currently in the datagrid.
      */
     @Override
-    public Collection<FlowId> getAllFlowIds() {
-	Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
+	public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
+	Collection<Pair<FlowId, Dpid>> allFlowIds =
+	    new LinkedList<Pair<FlowId, Dpid>>();
 
 	//
 	// Get all current entries
 	//
-	Collection<byte[]> values = mapFlowId.values();
 	Kryo kryo = kryoFactory.newKryo();
-	for (byte[] valueBytes : values) {
+	for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
+	    Long key = entry.getKey();
+	    byte[] valueBytes = entry.getValue();
+
+	    FlowId flowId = new FlowId(key);
+
 	    //
 	    // Decode the value
 	    //
 	    Input input = new Input(valueBytes);
-	    FlowId flowId = kryo.readObject(input, FlowId.class);
-	    allFlowIds.add(flowId);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+
+	    Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
+	    allFlowIds.add(pair);
 	}
 	kryoFactory.deleteKryo(kryo);
 
@@ -1056,23 +1072,24 @@
      * Send a notification that a FlowId is added.
      *
      * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid.
      */
     @Override
-    public void notificationSendFlowIdAdded(FlowId flowId) {
+    public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
 	//
 	// Encode the value
 	//
 	byte[] buffer = new byte[MAX_BUFFER_SIZE];
 	Kryo kryo = kryoFactory.newKryo();
 	Output output = new Output(buffer, -1);
-	kryo.writeObject(output, flowId);
+	kryo.writeObject(output, dpid);
 	byte[] valueBytes = output.toBytes();
 	kryoFactory.deleteKryo(kryo);
 
 	//
 	// Put the entry:
 	//  - Key : FlowId (Long)
-	//  - Value : Serialized FlowId (byte[])
+	//  - Value : Serialized Switch Dpid (byte[])
 	//
 	mapFlowId.putAsync(flowId.value(), valueBytes);
     }
@@ -1087,7 +1104,7 @@
 	//
 	// Remove the entry:
 	//  - Key : FlowId (Long)
-	//  - Value : Serialized FlowId (byte[])
+	//  - Value : Serialized Switch Dpid (byte[])
 	//
 	mapFlowId.removeAsync(flowId.value());
     }
@@ -1096,11 +1113,12 @@
      * Send a notification that a FlowId is updated.
      *
      * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid.
      */
     @Override
-    public void notificationSendFlowIdUpdated(FlowId flowId) {
+    public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
 	// NOTE: Adding an entry with an existing key automatically updates it
-	notificationSendFlowIdAdded(flowId);
+	notificationSendFlowIdAdded(flowId, dpid);
     }
 
     /**
@@ -1157,7 +1175,7 @@
 	//
 	// Remove the entry:
 	//  - Key : FlowEntryId (Long)
-	//  - Value : Serialized Dpid (byte[])
+	//  - Value : Serialized Switch Dpid (byte[])
 	//
 	mapFlowEntryId.removeAsync(flowEntryId.value());
     }
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index cfc10bc..a855798 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -140,14 +140,15 @@
      *
      * @return all Flow IDs that ae currently in the datagrid.
      */
-    Collection<FlowId> getAllFlowIds();
+    Collection<Pair<FlowId, Dpid>> getAllFlowIds();
 
     /**
      * Send a notification that a FlowId is added.
      *
      * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid.
      */
-    void notificationSendFlowIdAdded(FlowId flowId);
+    void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid);
 
     /**
      * Send a notification that a FlowId is removed.
@@ -160,8 +161,9 @@
      * Send a notification that a FlowId is updated.
      *
      * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid.
      */
-    void notificationSendFlowIdUpdated(FlowId flowId);
+    void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid);
 
     /**
      * Send a notification that all Flow IDs are removed.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 34958d8..fa650f6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -73,8 +73,8 @@
 	new LinkedList<EventEntry<FlowPath>>();
     private List<EventEntry<FlowEntry>> flowEntryEvents =
 	new LinkedList<EventEntry<FlowEntry>>();
-    private List<EventEntry<FlowId>> flowIdEvents =
-	new LinkedList<EventEntry<FlowId>>();
+    private List<EventEntry<Pair<FlowId, Dpid>>> flowIdEvents =
+	new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
     private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
 	new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
 
@@ -156,10 +156,11 @@
 	//
 	// Obtain the initial FlowId state
 	//
-	Collection<FlowId> flowIds = datagridService.getAllFlowIds();
-	for (FlowId flowId : flowIds) {
-	    EventEntry<FlowId> eventEntry =
-		new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	Collection<Pair<FlowId, Dpid>> flowIds =
+	    datagridService.getAllFlowIds();
+	for (Pair<FlowId, Dpid> pair : flowIds) {
+	    EventEntry<Pair<FlowId, Dpid>> eventEntry =
+		new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
 	    flowIdEvents.add(eventEntry);
 	}
 
@@ -203,7 +204,7 @@
 		//  - EventEntry<TopologyElement>
 		//  - EventEntry<FlowPath>
 		//  - EventEntry<FlowEntry>
-		//  - EventEntry<FlowId>
+		//  - EventEntry<Pair<FlowId, Dpid>>
 		//  - EventEntry<Pair<FlowEntryId, Dpid>>
 		//
 		for (EventEntry<?> event : collection) {
@@ -233,19 +234,23 @@
 		    }
 
 		    // FlowId event
-		    if (event.eventData() instanceof FlowId) {
-			EventEntry<FlowId> flowIdEventEntry =
-			    (EventEntry<FlowId>)event;
+		    if (event.eventData() instanceof Pair) {
+			EventEntry<Pair<FlowId, Dpid>> flowIdEventEntry =
+			    (EventEntry<Pair<FlowId, Dpid>>)event;
 			flowIdEvents.add(flowIdEventEntry);
 			continue;
 		    }
 		    // FlowEntryId event
+		    // TODO: Fix the code below if we need again to handle
+		    // the FlowEntryId events
+		    /*
 		    if (event.eventData() instanceof Pair) {
 			EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
 			    (EventEntry<Pair<FlowEntryId, Dpid>>)event;
 			flowEntryIdEvents.add(flowEntryIdEventEntry);
 			continue;
 		    }
+		    */
 		}
 		collection.clear();
 
@@ -496,10 +501,20 @@
 	//
 	// Process all Flow Id events and update the appropriate state
 	//
-	for (EventEntry<FlowId> eventEntry : flowIdEvents) {
-	    FlowId flowId = eventEntry.eventData();
+	for (EventEntry<Pair<FlowId, Dpid>> eventEntry : flowIdEvents) {
+	    Pair<FlowId, Dpid> pair = eventEntry.eventData();
+	    FlowId flowId = pair.first;
+	    Dpid dpid = pair.second;
 
-	    log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
+	    log.debug("Flow ID Event: {} {} {}", eventEntry.eventType(),
+		      flowId, dpid);
+
+	    //
+	    // Ignore Flows if the Source Switch is not controlled by this
+	    // instance.
+	    //
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
 
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD: {
@@ -1308,11 +1323,14 @@
      * Receive a notification that a FlowId is added.
      *
      * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
     @Override
-    public void notificationRecvFlowIdAdded(FlowId flowId) {
-	EventEntry<FlowId> eventEntry =
-	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+    public void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
 	networkEvents.add(eventEntry);
     }
 
@@ -1320,11 +1338,14 @@
      * Receive a notification that a FlowId is removed.
      *
      * @param flowId the FlowId that is removed.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
     @Override
-    public void notificationRecvFlowIdRemoved(FlowId flowId) {
-	EventEntry<FlowId> eventEntry =
-	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_REMOVE, flowId);
+    public void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowIdPair);
 	networkEvents.add(eventEntry);
     }
 
@@ -1332,12 +1353,15 @@
      * Receive a notification that a FlowId is updated.
      *
      * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
     @Override
-    public void notificationRecvFlowIdUpdated(FlowId flowId) {
+    public void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
 	// NOTE: The ADD and UPDATE events are processed in same way
-	EventEntry<FlowId> eventEntry =
-	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
 	networkEvents.add(eventEntry);
     }
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 33d5008..9dfe999 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -269,7 +269,8 @@
 
 	if (FlowDatabaseOperation.addFlow(dbHandlerApi, flowPath)) {
 	    if (enableOnrc2014MeasurementsFlows) {
-		datagridService.notificationSendFlowIdAdded(flowPath.flowId());
+		datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
+							    flowPath.dataPath().srcPort().dpid());
 	    } else {
 		datagridService.notificationSendFlowAdded(flowPath);
 	    }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 04f92bc..a44a898 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -57,22 +57,25 @@
      * Receive a notification that a FlowId is added.
      *
      * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
-    void notificationRecvFlowIdAdded(FlowId flowId);
+    void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid);
 
     /**
      * Receive a notification that a FlowId is removed.
      *
      * @param flowId the FlowId that is removed.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
-    void notificationRecvFlowIdRemoved(FlowId flowId);
+    void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid);
 
     /**
      * Receive a notification that a FlowId is updated.
      *
      * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
-    void notificationRecvFlowIdUpdated(FlowId flowId);
+    void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid);
 
     /**
      * Receive a notification that a FlowEntryId is added.