Fixes toward Bug #310 and Bug #318 (and might be Bug #328).

FlowManager refactoring to simplify and speedup the code and
eliminate Titan DB access bugs.

Add new method reconcileFlow() that reconciles a single flow
properly by avoiding a number of bugs in the older
implementation in method reconcileFlows().
One of the fixes is avoiding inappropriate closing of Titan DB
transaction.

With Masa's CHO tests for link failure on DEV-Z cluster,
there was only a single allping failure in almost 4 hours.
The root cause for that ping failure didn't seem to be flow-related.
With the older implementation, there was allping failure
every 10-15 minutes.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
index 2b9bf68..b150ced 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
@@ -112,8 +112,8 @@
 		}
 		Map<Long, IOFSwitch> mySwitches =
 		    floodlightProvider.getSwitches();
-		Map<Long, IFlowEntry> myFlowEntries =
-		    new TreeMap<Long, IFlowEntry>();
+		LinkedList<IFlowEntry> addFlowEntries =
+		    new LinkedList<IFlowEntry>();
 		LinkedList<IFlowEntry> deleteFlowEntries =
 		    new LinkedList<IFlowEntry>();
 
@@ -122,6 +122,7 @@
 		// Fetch all Flow Entries and select only my Flow Entries
 		// that need to be updated into the switches.
 		//
+		boolean processed_measurement_flow = false;
 		Iterable<IFlowEntry> allFlowEntries =
 		    conn.utils().getAllFlowEntries(conn);
 		for (IFlowEntry flowEntryObj : allFlowEntries) {
@@ -147,29 +148,26 @@
 		    if (mySwitch == null)
 			continue;	// Ignore the entry: not my switch
 
-		    myFlowEntries.put(flowEntryId.value(), flowEntryObj);
+		    IFlowPath flowObj =
+			conn.utils().getFlowPathByFlowEntry(conn, flowEntryObj);
+		    if (flowObj == null)
+			continue;		// Should NOT happen
+		    if (flowObj.getFlowId() == null)
+			continue;		// Invalid entry
+
+		    //
+		    // NOTE: For now we process the DELETE before the ADD
+		    // to cover the more common scenario.
+		    // TODO: This is error prone and needs to be fixed!
+		    //
 		    if (userState.equals("FE_USER_DELETE")) {
 			// An entry that needs to be deleted.
 			deleteFlowEntries.add(flowEntryObj);
+			installFlowEntry(mySwitch, flowObj, flowEntryObj);
+		    } else {
+			addFlowEntries.add(flowEntryObj);
 		    }
-		}
-
-		log.debug("MEASUREMENT: Found {} My Flow Entries NOT_UPDATED",
-			  myFlowEntries.size());
-
-		//
-		// Process my Flow Entries in the Flow Entry ID order
-		//
-		boolean processed_measurement_flow = false;
-		for (Map.Entry<Long, IFlowEntry> entry : myFlowEntries.entrySet()) {
 		    counterMyNotUpdatedFlowEntries++;
-		    IFlowEntry flowEntryObj = entry.getValue();
-		    IFlowPath flowObj =
-			conn.utils().getFlowPathByFlowEntry(conn,
-							    flowEntryObj);
-		    if (flowObj == null)
-			continue;		// Should NOT happen
-
 		    // Code for measurement purpose
 		    // TODO: Commented-out for now
 		    /*
@@ -179,6 +177,24 @@
 			}
 		    }
 		    */
+		}
+
+		/*
+		log.debug("MEASUREMENT: Found {} My Flow Entries NOT_UPDATED",
+		    addFlowEntries.size() + deleteFlowEntries.size());
+		*/
+
+		//
+		// Process the Flow Entries that need to be added
+		//
+		for (IFlowEntry flowEntryObj : addFlowEntries) {
+		    IFlowPath flowObj =
+			conn.utils().getFlowPathByFlowEntry(conn,
+							    flowEntryObj);
+		    if (flowObj == null)
+			continue;		// Should NOT happen
+		    if (flowObj.getFlowId() == null)
+			continue;		// Invalid entry
 
 		    Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
 		    IOFSwitch mySwitch = mySwitches.get(dpid.value());
@@ -187,8 +203,10 @@
 		    installFlowEntry(mySwitch, flowObj, flowEntryObj);
 		}
 
+		/*
 		log.debug("MEASUREMENT: Found {} Flow Entries to delete",
 			  deleteFlowEntries.size());
+		*/
 
 		//
 		// Delete all entries marked for deletion from the
@@ -217,7 +235,6 @@
 		//
 		topoRouteService.prepareShortestPathTopo();
 		Iterable<IFlowPath> allFlowPaths = conn.utils().getAllFlowPaths(conn);
-		HashSet<IFlowPath> flowObjSet = new HashSet<IFlowPath>();
 		for (IFlowPath flowPathObj : allFlowPaths) {
 		    counterAllFlowPaths++;
 		    if (flowPathObj == null)
@@ -290,14 +307,10 @@
 
 		    log.debug("RECONCILE: Need to Reconcile Shortest Path for FlowID {}",
 			      flowId.toString());
-		    flowObjSet.add(flowPathObj);
+		    reconcileFlow(flowPathObj, dataPath);
 		}
-		log.debug("MEASUREMENT: Found {} Flows to reconcile",
-			  flowObjSet.size());
-		reconcileFlows(flowObjSet);
 		topoRouteService.dropShortestPathTopo();
 
-
 		conn.endTx(Transaction.COMMIT);
 
 		if (processed_measurement_flow) {
@@ -511,7 +524,7 @@
 	// flowPath.dataPath().flowEntries()
 	//
 	for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
-	    if (addFlowEntry(flowObj, flowEntry) != true) {
+	    if (addFlowEntry(flowObj, flowEntry) == null) {
 		conn.endTx(Transaction.ROLLBACK);
 		return false;
 	    }
@@ -531,9 +544,9 @@
      *
      * @param flowObj the corresponding Flow Path object for the Flow Entry.
      * @param flowEntry the Flow Entry to install.
-     * @return true on success, otherwise false.
+     * @return the added Flow Entry object on success, otherwise null.
      */
-    private boolean addFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
+    private IFlowEntry addFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
 	// Flow edges
 	//   HeadFE (TODO)
 
@@ -562,12 +575,12 @@
 	} catch (Exception e) {
 	    log.error(":addFlow FlowEntryId:{} failed",
 		      flowEntry.flowEntryId().toString());
-	    return false;
+	    return null;
 	}
 	if (flowEntryObj == null) {
 	    log.error(":addFlow FlowEntryId:{} failed: FlowEntry object not created",
 		      flowEntry.flowEntryId().toString());
-	    return false;
+	    return null;
 	}
 
 	//
@@ -652,7 +665,7 @@
 	    flowEntryObj.setFlow(flowObj);
 	}
 
-	return true;
+	return flowEntryObj;
     }
 
     /**
@@ -1246,6 +1259,118 @@
     }
 
     /**
+     * Reconcile a flow.
+     *
+     * @param flowObj the flow that needs to be reconciliated.
+     * @param newDataPath the new data path to use.
+     * @return true on success, otherwise false.
+     */
+    public boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
+	Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
+
+	//
+	// Set the incoming port matching and the outgoing port output
+	// actions for each flow entry.
+	//
+	for (FlowEntry flowEntry : newDataPath.flowEntries()) {
+	    // Set the incoming port matching
+	    FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
+	    flowEntry.setFlowEntryMatch(flowEntryMatch);
+	    flowEntryMatch.enableInPort(flowEntry.inPort());
+
+	    // Set the outgoing port output action
+	    ArrayList<FlowEntryAction> flowEntryActions = flowEntry.flowEntryActions();
+	    if (flowEntryActions == null) {
+		flowEntryActions = new ArrayList<FlowEntryAction>();
+		flowEntry.setFlowEntryActions(flowEntryActions);
+	    }
+	    FlowEntryAction flowEntryAction = new FlowEntryAction();
+	    flowEntryAction.setActionOutput(flowEntry.outPort());
+	    flowEntryActions.add(flowEntryAction);
+	}
+
+	//
+	// Remove the old Flow Entries, and add the new Flow Entries
+	//
+
+	//
+	// Remove the Flow Entries from the Network MAP
+	//
+	Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+	LinkedList<IFlowEntry> deleteFlowEntries = new LinkedList<IFlowEntry>();
+	for (IFlowEntry flowEntryObj : flowEntries) {
+	    String dpidStr = flowEntryObj.getSwitchDpid();
+	    if (dpidStr == null)
+		continue;
+	    Dpid dpid = new Dpid(dpidStr);
+	    IOFSwitch mySwitch = mySwitches.get(dpid.value());
+
+	    flowEntryObj.setUserState("FE_USER_DELETE");
+	    if (mySwitch == null) {
+		//
+		// Not my switch. Mark it for deletion in the Network MAP
+		//
+		flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
+		continue;
+	    }
+
+	    deleteFlowEntries.add(flowEntryObj);
+
+	    //
+	    // Delete the flow entry from the switch
+	    //
+	    // flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
+	    installFlowEntry(mySwitch, flowObj, flowEntryObj);
+	    // flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
+	}
+	for (IFlowEntry flowEntryObj : deleteFlowEntries) {
+	    flowObj.removeFlowEntry(flowEntryObj);
+	    conn.utils().removeFlowEntry(conn, flowEntryObj);
+	}
+
+	//
+	// Install the new shortest path into the Network MAP and the switches.
+	//
+	for (FlowEntry flowEntry : newDataPath.flowEntries()) {
+	    flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
+	    IFlowEntry flowEntryObj = addFlowEntry(flowObj, flowEntry);
+	    if (flowEntryObj == null) {
+		//
+		// TODO: Remove the "new Object[] wrapper in the statement
+		// below after the SLF4J logger is upgraded to
+		// Version 1.7.5
+		//
+		log.error("Cannot add Flow Entry to switch {} for Path Flow from {} to {} : Flow Entry not in the Network MAP",
+			  new Object[] {
+			      flowEntry.dpid(),
+			      newDataPath.srcPort(),
+			      newDataPath.dstPort()
+			  });
+		continue;
+	    }
+
+	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+	    if (mySwitch == null) {
+		// Not my switch: just add to the Network MAP
+		continue;
+	    }
+
+	    // Install the Flow Entry into the switch
+	    if (installFlowEntry(mySwitch, flowObj, flowEntryObj)) {
+		flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
+	    }
+	}
+
+	//
+	// Set the Data Path Summary
+	//
+	String dataPathSummaryStr = newDataPath.dataPathSummary();
+	flowObj.setDataPathSummary(dataPathSummaryStr);
+
+	return true;
+    }
+
+    /**
      * Reconcile all flows in a set.
      *
      * @param flowObjSet the set of flows that need to be reconciliated.
@@ -1362,9 +1487,13 @@
      */
     public boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
 				    IFlowEntry flowEntryObj) {
-	FlowEntryId flowEntryId =
-	    new FlowEntryId(flowEntryObj.getFlowEntryId());
+	String flowEntryIdStr = flowEntryObj.getFlowEntryId();
+	if (flowEntryIdStr == null)
+	    return false;
+	FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
 	String userState = flowEntryObj.getUserState();
+	if (userState == null)
+	    return false;
 
 	//
 	// Create the Open Flow Flow Modification Entry to push
@@ -1487,6 +1616,13 @@
 	//
 	// Write the message to the switch
 	//
+	log.debug("MEASUREMENT: Installing flow entry " + userState +
+		  " into switch DPID: " +
+		  mySwitch.getStringId() +
+		  " flowEntryId: " + flowEntryId.toString() +
+		  " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
+		  " inPort: " + matchInPort + " outPort: " + actionOutputPort
+		  );
 	try {
 	    messageDamper.write(mySwitch, fm, null);
 	    mySwitch.flush();