Updated the FlowId Hazelcast Notification Channel so the payload
is the Source Switch DPID. Thus, the receiver of the notification
can easily check which are the Flows it should fetch from the database.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 34958d8..fa650f6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -73,8 +73,8 @@
 	new LinkedList<EventEntry<FlowPath>>();
     private List<EventEntry<FlowEntry>> flowEntryEvents =
 	new LinkedList<EventEntry<FlowEntry>>();
-    private List<EventEntry<FlowId>> flowIdEvents =
-	new LinkedList<EventEntry<FlowId>>();
+    private List<EventEntry<Pair<FlowId, Dpid>>> flowIdEvents =
+	new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
     private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
 	new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
 
@@ -156,10 +156,11 @@
 	//
 	// Obtain the initial FlowId state
 	//
-	Collection<FlowId> flowIds = datagridService.getAllFlowIds();
-	for (FlowId flowId : flowIds) {
-	    EventEntry<FlowId> eventEntry =
-		new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	Collection<Pair<FlowId, Dpid>> flowIds =
+	    datagridService.getAllFlowIds();
+	for (Pair<FlowId, Dpid> pair : flowIds) {
+	    EventEntry<Pair<FlowId, Dpid>> eventEntry =
+		new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
 	    flowIdEvents.add(eventEntry);
 	}
 
@@ -203,7 +204,7 @@
 		//  - EventEntry<TopologyElement>
 		//  - EventEntry<FlowPath>
 		//  - EventEntry<FlowEntry>
-		//  - EventEntry<FlowId>
+		//  - EventEntry<Pair<FlowId, Dpid>>
 		//  - EventEntry<Pair<FlowEntryId, Dpid>>
 		//
 		for (EventEntry<?> event : collection) {
@@ -233,19 +234,23 @@
 		    }
 
 		    // FlowId event
-		    if (event.eventData() instanceof FlowId) {
-			EventEntry<FlowId> flowIdEventEntry =
-			    (EventEntry<FlowId>)event;
+		    if (event.eventData() instanceof Pair) {
+			EventEntry<Pair<FlowId, Dpid>> flowIdEventEntry =
+			    (EventEntry<Pair<FlowId, Dpid>>)event;
 			flowIdEvents.add(flowIdEventEntry);
 			continue;
 		    }
 		    // FlowEntryId event
+		    // TODO: Fix the code below if we need again to handle
+		    // the FlowEntryId events
+		    /*
 		    if (event.eventData() instanceof Pair) {
 			EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
 			    (EventEntry<Pair<FlowEntryId, Dpid>>)event;
 			flowEntryIdEvents.add(flowEntryIdEventEntry);
 			continue;
 		    }
+		    */
 		}
 		collection.clear();
 
@@ -496,10 +501,20 @@
 	//
 	// Process all Flow Id events and update the appropriate state
 	//
-	for (EventEntry<FlowId> eventEntry : flowIdEvents) {
-	    FlowId flowId = eventEntry.eventData();
+	for (EventEntry<Pair<FlowId, Dpid>> eventEntry : flowIdEvents) {
+	    Pair<FlowId, Dpid> pair = eventEntry.eventData();
+	    FlowId flowId = pair.first;
+	    Dpid dpid = pair.second;
 
-	    log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
+	    log.debug("Flow ID Event: {} {} {}", eventEntry.eventType(),
+		      flowId, dpid);
+
+	    //
+	    // Ignore Flows if the Source Switch is not controlled by this
+	    // instance.
+	    //
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
 
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD: {
@@ -1308,11 +1323,14 @@
      * Receive a notification that a FlowId is added.
      *
      * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
     @Override
-    public void notificationRecvFlowIdAdded(FlowId flowId) {
-	EventEntry<FlowId> eventEntry =
-	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+    public void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
 	networkEvents.add(eventEntry);
     }
 
@@ -1320,11 +1338,14 @@
      * Receive a notification that a FlowId is removed.
      *
      * @param flowId the FlowId that is removed.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
     @Override
-    public void notificationRecvFlowIdRemoved(FlowId flowId) {
-	EventEntry<FlowId> eventEntry =
-	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_REMOVE, flowId);
+    public void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowIdPair);
 	networkEvents.add(eventEntry);
     }
 
@@ -1332,12 +1353,15 @@
      * Receive a notification that a FlowId is updated.
      *
      * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
      */
     @Override
-    public void notificationRecvFlowIdUpdated(FlowId flowId) {
+    public void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
 	// NOTE: The ADD and UPDATE events are processed in same way
-	EventEntry<FlowId> eventEntry =
-	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
 	networkEvents.add(eventEntry);
     }