Modify the logic for writing the Flow Entries to the database:
For each flow, only the controller for the first-hop switch is
writing the Flow Entries to the database.

Apparently, if multiple controllers try to write its own
Flow Entry for the same Flow Path to the database, this
triggers a Titan exception.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index f3e47f4..0e9887a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -32,6 +32,19 @@
 import org.slf4j.LoggerFactory;
 
 /**
+ * A class for storing a pair of Flow Path and a Flow Entry.
+ */
+class FlowPathEntryPair {
+    protected FlowPath flowPath;
+    protected FlowEntry flowEntry;
+
+    protected FlowPathEntryPair(FlowPath flowPath, FlowEntry flowEntry) {
+	this.flowPath = flowPath;
+	this.flowEntry = flowEntry;
+    }
+}
+
+/**
  * Class for FlowPath Maintenance.
  * This class listens for FlowEvents to:
  * - Maintain a local cache of the Network Topology.
@@ -46,8 +59,8 @@
     private IDatagridService datagridService;	// The Datagrid Service to use
     private Topology topology;			// The network topology
     private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
-    private List<FlowEntry> unmatchedFlowEntryUpdates =
-	new LinkedList<FlowEntry>();
+    private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
+	new HashMap<Long, FlowEntry>();
 
     // The queue with Flow Path and Topology Element updates
     private BlockingQueue<EventEntry<?>> networkEvents =
@@ -66,10 +79,15 @@
     //  - The new Flow Paths
     //  - The Flow Paths that should be recomputed
     //  - The Flow Paths with modified Flow Entries
+    //  - The Flow Entries that were updated
     //
     private List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
     private List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
     private List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+    private List<FlowPathEntryPair> updatedFlowEntries =
+	new LinkedList<FlowPathEntryPair>();
+    private List<FlowPathEntryPair> unmatchedDeleteFlowEntries =
+	new LinkedList<FlowPathEntryPair>();
 
 
     /**
@@ -181,7 +199,7 @@
      * Process the events (if any)
      */
     private void processEvents() {
-	List<FlowEntry> modifiedFlowEntries;
+	List<FlowPathEntryPair> modifiedFlowEntries;
 
 	if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
 	    flowEntryEvents.isEmpty()) {
@@ -214,9 +232,11 @@
 	//
 	// Push the modified Flow Entries to switches, datagrid and database
 	//
-	flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowPaths);
+	flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
 	flowManager.pushModifiedFlowEntriesToDatagrid(modifiedFlowEntries);
 	flowManager.pushModifiedFlowEntriesToDatabase(modifiedFlowEntries);
+	flowManager.pushModifiedFlowEntriesToDatabase(updatedFlowEntries);
+	flowManager.pushModifiedFlowEntriesToDatabase(unmatchedDeleteFlowEntries);
 
 	//
 	// Remove Flow Entries that were deleted
@@ -232,21 +252,26 @@
 	newFlowPaths.clear();
 	recomputeFlowPaths.clear();
 	modifiedFlowPaths.clear();
+	updatedFlowEntries.clear();
+	unmatchedDeleteFlowEntries.clear();
     }
 
     /**
      * Extract the modified Flow Entries.
      */
-    private List<FlowEntry> extractModifiedFlowEntries(
-				List<FlowPath> modifiedFlowPaths) {
-	List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+    private List<FlowPathEntryPair> extractModifiedFlowEntries(
+					List<FlowPath> modifiedFlowPaths) {
+	List<FlowPathEntryPair> modifiedFlowEntries =
+	    new LinkedList<FlowPathEntryPair>();
 
 	// Extract only the modified Flow Entries
 	for (FlowPath flowPath : modifiedFlowPaths) {
 	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
 		if (flowEntry.flowEntrySwitchState() ==
 		    FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
-		    modifiedFlowEntries.add(flowEntry);
+		    FlowPathEntryPair flowPair =
+			new FlowPathEntryPair(flowPath, flowEntry);
+		    modifiedFlowEntries.add(flowPair);
 		}
 	    }
 	}
@@ -256,7 +281,7 @@
     /**
      * Assign the Flow Entry ID as needed.
      */
-    private void assignFlowEntryId(List<FlowEntry> modifiedFlowEntries) {
+    private void assignFlowEntryId(List<FlowPathEntryPair> modifiedFlowEntries) {
 	if (modifiedFlowEntries.isEmpty())
 	    return;
 
@@ -265,7 +290,8 @@
 	//
 	// Assign the Flow Entry ID only for Flow Entries for my switches
 	//
-	for (FlowEntry flowEntry : modifiedFlowEntries) {
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowEntry flowEntry = flowPair.flowEntry;
 	    // Update the Flow Entries only for my switches
 	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
 	    if (mySwitch == null)
@@ -388,17 +414,29 @@
      * Process the Flow Entry events.
      */
     private void processFlowEntryEvents() {
+	FlowPathEntryPair flowPair;
+	FlowPath flowPath;
+	FlowEntry updatedFlowEntry;
+
 	//
 	// Update Flow Entries with previously unmatched Flow Entry updates
 	//
-	if ((! flowPathEvents.isEmpty()) &&
-	    (! unmatchedFlowEntryUpdates.isEmpty())) {
-	    List<FlowEntry> remainingUpdates = new LinkedList<FlowEntry>();
-	    for (FlowEntry flowEntry : unmatchedFlowEntryUpdates) {
-		if (! updateFlowEntry(flowEntry))
-		    remainingUpdates.add(flowEntry);
+	if (! unmatchedFlowEntryAdd.isEmpty()) {
+	    Map<Long, FlowEntry> remainingUpdates = new HashMap<Long, FlowEntry>();
+	    for (FlowEntry flowEntry : unmatchedFlowEntryAdd.values()) {
+		flowPath = allFlowPaths.get(flowEntry.flowId().value());
+		if (flowPath == null)
+		    continue;
+		updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+		if (updatedFlowEntry == null) {
+		    remainingUpdates.put(flowEntry.flowEntryId().value(),
+					 flowEntry);
+		    continue;
+		}
+		flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+		updatedFlowEntries.add(flowPair);
 	    }
-	    unmatchedFlowEntryUpdates = remainingUpdates;
+	    unmatchedFlowEntryAdd = remainingUpdates;
 	}
 
 	//
@@ -406,58 +444,78 @@
 	//
 	for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
 	    FlowEntry flowEntry = eventEntry.eventData();
+
+	    log.debug("Flow Entry Event: {} {}", eventEntry.eventType(),
+		      flowEntry.toString());
+
+	    if ((! flowEntry.isValidFlowId()) ||
+		(! flowEntry.isValidFlowEntryId())) {
+		continue;
+	    }
+
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD:
-		//
-		// Find the corresponding Flow Entry and update it.
-		// If not found, then keep it in a local cache for
-		// later matching.
-		//
-		if (! updateFlowEntry(flowEntry))
-		    unmatchedFlowEntryUpdates.add(flowEntry);
+		flowPath = allFlowPaths.get(flowEntry.flowId().value());
+		if (flowPath == null) {
+		    // Flow Path not found: keep the entry for later matching
+		    unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+					      flowEntry);
+		    break;
+		}
+		updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+		if (updatedFlowEntry == null) {
+		    // Flow Entry not found: keep the entry for later matching
+		    unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+					      flowEntry);
+		    break;
+		}
+		// Add the updated entry to the list of updated Flow Entries
+		flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+		updatedFlowEntries.add(flowPair);
 		break;
+
 	    case ENTRY_REMOVE:
-		//
-		// NOTE: For now we remove the Flow Entries based on
-		// local decisions, so no need to remove them because of an
-		// external event.
-		//
+		flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+		if (unmatchedFlowEntryAdd.remove(flowEntry.flowEntryId().value()) != null) {
+		    continue;		// Match found
+		}
+						 
+		flowPath = allFlowPaths.get(flowEntry.flowId().value());
+		if (flowPath == null) {
+		    // Flow Path not found: ignore the update
+		    break;
+		}
+		updatedFlowEntry = updateFlowEntryRemove(flowPath, flowEntry);
+		if (updatedFlowEntry == null) {
+		    // Flow Entry not found: add to list of deleted entries
+		    flowPair = new FlowPathEntryPair(flowPath, flowEntry);
+		    unmatchedDeleteFlowEntries.add(flowPair);
+		    break;
+		}
+		flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+		updatedFlowEntries.add(flowPair);
 		break;
 	    }
 	}
     }
 
     /**
-     * Update a Flow Entry because of an external event.
+     * Update a Flow Entry because of an external ENTRY_ADD event.
      *
+     * @param flowPath the FlowPath for the Flow Entry to update.
      * @param flowEntry the FlowEntry with the new state.
-     * @return true if the Flow Entry was found and updated, otherwise false.
+     * @return the updated Flow Entry if found, otherwise null.
      */
-    private boolean updateFlowEntry(FlowEntry flowEntry) {
-	if ((! flowEntry.isValidFlowId()) ||
-	    (! flowEntry.isValidFlowEntryId())) {
-	    //
-	    // Ignore events for Flow Entries with invalid Flow ID or
-	    // Flow Entry ID.
-	    // This shouldn't happen.
-	    //
-	    return true;
-	}
-
-	FlowPath flowPath = allFlowPaths.get(flowEntry.flowId().value());
-	if (flowPath == null)
-	    return false;
-
+    private FlowEntry updateFlowEntryAdd(FlowPath flowPath,
+					 FlowEntry flowEntry) {
 	//
-	// Iterate over all Flow Entries and find a match based on the DPID
+	// Iterate over all Flow Entries and find a match.
 	//
 	for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
-	    if (localFlowEntry.dpid().value() != flowEntry.dpid().value())
+	    if (! TopologyManager.isSameFlowEntryDataPath(localFlowEntry,
+							  flowEntry)) {
 		continue;
-	    //
-	    // TODO: We might want to check the FlowEntryMatch and
-	    // FlowEntryActions to double-check it is the same Flow Entry
-	    //
+	    }
 
 	    //
 	    // Local Flow Entry match found
@@ -467,9 +525,9 @@
 		    flowEntry.flowEntryId().value()) {
 		    //
 		    // Find a local Flow Entry, but the Flow Entry ID doesn't
-		    // match. Ignore the event.
+		    // match. Keep looking.
 		    //
-		    return true;
+		    continue;
 		}
 	    } else {
 		// Update the Flow Entry ID
@@ -480,13 +538,44 @@
 
 	    //
 	    // Update the local Flow Entry.
-	    // For now we update only the Flow Entry Switch State
 	    //
+	    localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
 	    localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
-	    return true;
+	    return localFlowEntry;
 	}
 
-	return false;		// Entry not found
+	return null;		// Entry not found
+    }
+
+    /**
+     * Update a Flow Entry because of an external ENTRY_REMOVE event.
+     *
+     * @param flowPath the FlowPath for the Flow Entry to update.
+     * @param flowEntry the FlowEntry with the new state.
+     * @return the updated Flow Entry if found, otherwise null.
+     */
+    private FlowEntry updateFlowEntryRemove(FlowPath flowPath,
+					    FlowEntry flowEntry) {
+	//
+	// Iterate over all Flow Entries and find a match based on
+	// the Flow Entry ID.
+	//
+	for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
+	    if (! localFlowEntry.isValidFlowEntryId())
+		continue;
+	    if (localFlowEntry.flowEntryId().value() !=
+		flowEntry.flowEntryId().value()) {
+		    continue;
+	    }
+	    //
+	    // Update the local Flow Entry.
+	    //
+	    localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
+	    localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
+	    return localFlowEntry;
+	}
+
+	return null;		// Entry not found
     }
 
     /**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 415e281..b1476b7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -869,61 +869,10 @@
      * NOTE: Only the Flow Entries to switches controlled by this instance
      * are pushed.
      *
-     * @param modifiedFlowPaths the collection of Flow Paths with the modified
-     * Flow Entries.
-     */
-    public void pushModifiedFlowEntriesToSwitches(
-			Collection<FlowPath> modifiedFlowPaths) {
-	// TODO: For now, the pushing of Flow Entries is disabled
-	if (true)
-	    return;
-
-	if (modifiedFlowPaths.isEmpty())
-	    return;
-
-	Map<Long, IOFSwitch> mySwitches = getMySwitches();
-
-	for (FlowPath flowPath : modifiedFlowPaths) {
-	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
-		log.debug("Updating Flow Entry: {}", flowEntry.toString());
-
-		if (flowEntry.flowEntrySwitchState() !=
-		    FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
-		    continue;		// No need to update the entry
-		}
-
-		IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
-		if (mySwitch == null)
-		    continue;
-
-		//
-		// Install the Flow Entry into the switch
-		//
-		if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
-		    String logMsg = "Cannot install Flow Entry " +
-			flowEntry.flowEntryId() +
-			" from Flow Path " + flowPath.flowId() +
-			" on switch " + flowEntry.dpid();
-		    log.error(logMsg);
-		    continue;
-		}
-
-		//
-		// NOTE: Here we assume that the switch has been
-		// successfully updated.
-		//
-		flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
-	    }
-	}
-    }
-
-    /**
-     * Push modified Flow Entries to the datagrid.
-     *
      * @param modifiedFlowEntries the collection of modified Flow Entries.
      */
-    public void pushModifiedFlowEntriesToDatagrid(
-			Collection<FlowEntry> modifiedFlowEntries) {
+    public void pushModifiedFlowEntriesToSwitches(
+			Collection<FlowPathEntryPair> modifiedFlowEntries) {
 	// TODO: For now, the pushing of Flow Entries is disabled
 	if (true)
 	    return;
@@ -933,7 +882,55 @@
 
 	Map<Long, IOFSwitch> mySwitches = getMySwitches();
 
-	for (FlowEntry flowEntry : modifiedFlowEntries) {
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowPath flowPath = flowPair.flowPath;
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+	    if (mySwitch == null)
+		continue;
+
+	    log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+
+	    //
+	    // Install the Flow Entry into the switch
+	    //
+	    if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
+		String logMsg = "Cannot install Flow Entry " +
+		    flowEntry.flowEntryId() +
+		    " from Flow Path " + flowPath.flowId() +
+		    " on switch " + flowEntry.dpid();
+		log.error(logMsg);
+		continue;
+	    }
+
+	    //
+	    // NOTE: Here we assume that the switch has been
+	    // successfully updated.
+	    //
+	    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+	}
+    }
+
+    /**
+     * Push modified Flow Entries to the datagrid.
+     *
+     * @param modifiedFlowEntries the collection of modified Flow Entries.
+     */
+    public void pushModifiedFlowEntriesToDatagrid(
+			Collection<FlowPathEntryPair> modifiedFlowEntries) {
+	// TODO: For now, the pushing of Flow Entries is disabled
+	if (true)
+	    return;
+
+	if (modifiedFlowEntries.isEmpty())
+	    return;
+
+	Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
 	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
 
 	    //
@@ -956,6 +953,7 @@
 		    continue;
 	    }
 
+	    log.debug("Pushing Flow Entry To Datagrid: {}", flowEntry.toString());
 	    //
 	    // Write the Flow Entry to the Datagrid
 	    //
@@ -980,10 +978,14 @@
     /**
      * Push Flow Entries to the Network MAP.
      *
+     * NOTE: The Flow Entries are pushed only on the instance responsible
+     * for the first switch. This is to avoid database errors when multiple
+     * instances are writing Flow Entries for the same Flow Path.
+     *
      * @param modifiedFlowEntries the collection of Flow Entries to push.
      */
     public void pushModifiedFlowEntriesToDatabase(
-		Collection<FlowEntry> modifiedFlowEntries) {
+		Collection<FlowPathEntryPair> modifiedFlowEntries) {
 	// TODO: For now, the pushing of Flow Entries is disabled
 	if (true)
 	    return;
@@ -993,40 +995,30 @@
 
 	Map<Long, IOFSwitch> mySwitches = getMySwitches();
 
-	for (FlowEntry flowEntry : modifiedFlowEntries) {
-	    if (! flowEntry.isValidFlowId()) {
-		// Shouldn't happen
-		log.error("Cannot push Flow Entry to database: invalid Flow ID: {}", flowEntry.toString());
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowPath flowPath = flowPair.flowPath;
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    if (! flowEntry.isValidFlowEntryId())
 		continue;
-	    }
-
-	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
 
 	    //
-	    // TODO: For now Flow Entries are removed by all instances,
-	    // even if this Flow Entry is not for our switches.
+	    // Push the changes only on the instance responsible for the
+	    // first switch.
 	    //
-	    // This is needed to handle the case a switch going down:
-	    // it has no Master controller instance, hence no
-	    // controller instance will cleanup its flow entries.
-	    // This is sub-optimal: we need to elect a controller
-	    // instance to handle the cleanup of such orphaned flow
-	    // entries.
-	    //
-	    if (mySwitch == null) {
-		if (flowEntry.flowEntryUserState() !=
-		    FlowEntryUserState.FE_USER_DELETE) {
-		    continue;
-		}
-		if (! flowEntry.isValidFlowEntryId())
-		    continue;
-	    }
+	    Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+	    IOFSwitch mySrcSwitch = mySwitches.get(srcDpid.value());
+	    if (mySrcSwitch == null)
+		continue;
 
+	    log.debug("Pushing Flow Entry To Database: {}", flowEntry.toString());
 	    //
 	    // Write the Flow Entry to the Network Map
 	    //
-	    // NOTE: We try a number of times; apparently, if other instances
-	    // are writing at the same time this will trigger an error.
+	    // NOTE: We try a number of times, in case somehow some other
+	    // instances are writing at the same time.
+	    // Apparently, if other instances are writing at the same time
+	    // this will trigger an error.
 	    //
 	    for (int i = 0; i < 6; i++) {
 		try {