Merge master branch into syncdev

Conflicts:
	src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 1ef4431..ab4be1a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -18,6 +18,7 @@
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.restserver.IRestApiService;
+import net.floodlightcontroller.util.OFMessageDamper;
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.graph.GraphDBOperation;
 import net.onrc.onos.ofcontroller.core.INetMapStorage;
@@ -26,33 +27,49 @@
 import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
 import net.onrc.onos.ofcontroller.flowmanager.web.FlowWebRoutable;
 import net.onrc.onos.ofcontroller.flowprogrammer.FlowPusher;
-import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
 import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.*;
 
-import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Flow Manager class for handling the network flows.
  */
-public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage,
-	IFlowPusherService {
+public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
 
-    protected GraphDBOperation dbHandler;
+    //
+    // TODO: A temporary variable to switch between the poll-based and
+    // notification mechanism for the Flow Manager.
+    //
+    private final static boolean enableNotifications = false;
+
+    protected GraphDBOperation dbHandlerApi;
+    protected GraphDBOperation dbHandlerInner;
 
     protected volatile IFloodlightProviderService floodlightProvider;
     protected volatile ITopologyNetService topologyNetService;
     protected volatile IDatagridService datagridService;
     protected IRestApiService restApi;
     protected FloodlightModuleContext context;
-    protected PathComputation pathComputation;
+    protected FlowEventHandler flowEventHandler;
 
     protected FlowPusher pusher;
+    private static final int NUM_PUSHER_THREAD = 1;
     
+//	LEGACY
+//    protected OFMessageDamper messageDamper;
+//
+//    //
+//    // TODO: Values copied from elsewhere (class LearningSwitch).
+//    // The local copy should go away!
+//    //
+//    protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
+//    protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250;	// ms
+
     // Flow Entry ID generation state
     private static Random randomGenerator = new Random();
     private static int nextFlowEntryIdPrefix = 0;
@@ -76,7 +93,7 @@
 		    runImpl();
 		} catch (Exception e) {
 		    log.debug("Exception processing All Flow Entries from the Network MAP: ", e);
-		    dbHandler.rollback();
+		    dbHandlerInner.rollback();
 		    return;
 		}
 	    }
@@ -107,10 +124,8 @@
 		// switches.
 		//
 		Iterable<IFlowEntry> allFlowEntries =
-		    dbHandler.getAllSwitchNotUpdatedFlowEntries();
+		    dbHandlerInner.getAllSwitchNotUpdatedFlowEntries();
 		for (IFlowEntry flowEntryObj : allFlowEntries) {
-			log.debug("flowEntryobj : {}", flowEntryObj);
-			
 		    counterAllFlowEntries++;
 
 		    String dpidStr = flowEntryObj.getSwitchDpid();
@@ -122,7 +137,7 @@
 			continue;	// Ignore the entry: not my switch
 
 		    IFlowPath flowObj =
-			dbHandler.getFlowPathByFlowEntry(flowEntryObj);
+			dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
 		    if (flowObj == null)
 			continue;		// Should NOT happen
 		    if (flowObj.getFlowId() == null)
@@ -145,15 +160,13 @@
 		    }
 		    counterMyNotUpdatedFlowEntries++;
 		}
-		
-		log.debug("addFlowEntries : {}", addFlowEntries);
 
 		//
 		// Process the Flow Entries that need to be added
 		//
 		for (IFlowEntry flowEntryObj : addFlowEntries) {
 		    IFlowPath flowObj =
-			dbHandler.getFlowPathByFlowEntry(flowEntryObj);
+			dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
 		    if (flowObj == null)
 			continue;		// Should NOT happen
 		    if (flowObj.getFlowId() == null)
@@ -177,16 +190,16 @@
 		while (! deleteFlowEntries.isEmpty()) {
 		    IFlowEntry flowEntryObj = deleteFlowEntries.poll();
 		    IFlowPath flowObj =
-			dbHandler.getFlowPathByFlowEntry(flowEntryObj);
+			dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
 		    if (flowObj == null) {
 			log.debug("Did not find FlowPath to be deleted");
 			continue;
 		    }
 		    flowObj.removeFlowEntry(flowEntryObj);
-		    dbHandler.removeFlowEntry(flowEntryObj);
+		    dbHandlerInner.removeFlowEntry(flowEntryObj);
 		}
 
-		dbHandler.commit();
+		dbHandlerInner.commit();
 
 		long estimatedTime = System.nanoTime() - startTime;
 		double rate = 0.0;
@@ -211,7 +224,7 @@
 		    runImpl();
 		} catch (Exception e) {
 		    log.debug("Exception processing All Flows from the Network MAP: ", e);
-		    dbHandler.rollback();
+		    dbHandlerInner.rollback();
 		    return;
 		}
 	    }
@@ -238,7 +251,7 @@
 		// Flow Paths this controller is responsible for.
 		//
 		Topology topology = topologyNetService.newDatabaseTopology();
-		Iterable<IFlowPath> allFlowPaths = dbHandler.getAllFlowPaths();
+		Iterable<IFlowPath> allFlowPaths = dbHandlerInner.getAllFlowPaths();
 		for (IFlowPath flowPathObj : allFlowPaths) {
 		    counterAllFlowPaths++;
 		    if (flowPathObj == null)
@@ -340,12 +353,12 @@
 		//
 		while (! deleteFlows.isEmpty()) {
 		    IFlowPath flowPathObj = deleteFlows.poll();
-		    dbHandler.removeFlowPath(flowPathObj);
+		    dbHandlerInner.removeFlowPath(flowPathObj);
 		}
 
 		topologyNetService.dropTopology(topology);
 
-		dbHandler.commit();
+		dbHandlerInner.commit();
 
 		long estimatedTime = System.nanoTime() - startTime;
 		double rate = 0.0;
@@ -368,7 +381,8 @@
      */
     @Override
     public void init(String conf) {
-    	dbHandler = new GraphDBOperation(conf);
+    	dbHandlerApi = new GraphDBOperation(conf);
+    	dbHandlerInner = new GraphDBOperation(conf);
     }
 
     /**
@@ -383,9 +397,9 @@
      */
     @Override
     public void close() {
-	datagridService.deregisterPathComputationService(pathComputation);
-    	dbHandler.close();
-    	pusher.stop();
+	datagridService.deregisterFlowEventHandlerService(flowEventHandler);
+    	dbHandlerApi.close();
+    	dbHandlerInner.close();
     }
 
     /**
@@ -447,10 +461,14 @@
 	topologyNetService = context.getServiceImpl(ITopologyNetService.class);
 	datagridService = context.getServiceImpl(IDatagridService.class);
 	restApi = context.getServiceImpl(IRestApiService.class);
-	
-	pusher = new FlowPusher();
+
+//	LEGACY
+//	messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+//					    EnumSet.of(OFType.FLOW_MOD),
+//					    OFMESSAGE_DAMPER_TIMEOUT);
+
+	pusher = new FlowPusher(NUM_PUSHER_THREAD);
 	pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
-	
 	this.init("");
 
 	mapReaderScheduler = Executors.newScheduledThreadPool(1);
@@ -493,20 +511,22 @@
 	nextFlowEntryIdPrefix = randomGenerator.nextInt();
 
 	pusher.start();
-
+	
 	//
-	// Create the Path Computation thread and register it with the
+	// Create the Flow Event Handler thread and register it with the
 	// Datagrid Service
 	//
-	pathComputation = new PathComputation(this, datagridService);
-	datagridService.registerPathComputationService(pathComputation);
+	flowEventHandler = new FlowEventHandler(this, datagridService);
+	datagridService.registerFlowEventHandlerService(flowEventHandler);
 
 	// Schedule the threads and periodic tasks
-	pathComputation.start();
-	mapReaderScheduler.scheduleAtFixedRate(
+	flowEventHandler.start();
+	if (! enableNotifications) {
+	    mapReaderScheduler.scheduleAtFixedRate(
 			mapReader, 3, 3, TimeUnit.SECONDS);
-	shortestPathReconcileScheduler.scheduleAtFixedRate(
+	    shortestPathReconcileScheduler.scheduleAtFixedRate(
 			shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
+	}
     }
 
     /**
@@ -519,7 +539,7 @@
     @Override
     public boolean addFlow(FlowPath flowPath, FlowId flowId) {
 	//
-	// NOTE: We need to explicitly initialize the Flow Entry Switch State,
+	// NOTE: We need to explicitly initialize some of the state,
 	// in case the application didn't do it.
 	//
 	for (FlowEntry flowEntry : flowPath.flowEntries()) {
@@ -527,9 +547,11 @@
 		FlowEntrySwitchState.FE_SWITCH_UNKNOWN) {
 		flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
 	    }
+	    if (! flowEntry.isValidFlowId())
+		flowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
 	}
 
-	if (FlowDatabaseOperation.addFlow(this, dbHandler, flowPath, flowId)) {
+	if (FlowDatabaseOperation.addFlow(this, dbHandlerApi, flowPath, flowId)) {
 	    datagridService.notificationSendFlowAdded(flowPath);
 	    return true;
 	}
@@ -544,8 +566,20 @@
      * @return the added Flow Entry object on success, otherwise null.
      */
     private IFlowEntry addFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
-	return FlowDatabaseOperation.addFlowEntry(this, dbHandler, flowObj,
-						  flowEntry);
+	return FlowDatabaseOperation.addFlowEntry(this, dbHandlerInner,
+						  flowObj, flowEntry);
+    }
+
+    /**
+     * Delete a flow entry from the Network MAP.
+     *
+     * @param flowObj the corresponding Flow Path object for the Flow Entry.
+     * @param flowEntry the Flow Entry to delete.
+     * @return true on success, otherwise false.
+     */
+    private boolean deleteFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
+	return FlowDatabaseOperation.deleteFlowEntry(dbHandlerInner,
+						     flowObj, flowEntry);
     }
 
     /**
@@ -555,7 +589,7 @@
      */
     @Override
     public boolean deleteAllFlows() {
-	if (FlowDatabaseOperation.deleteAllFlows(dbHandler)) {
+	if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
 	    datagridService.notificationSendAllFlowsRemoved();
 	    return true;
 	}
@@ -570,7 +604,7 @@
      */
     @Override
     public boolean deleteFlow(FlowId flowId) {
-	if (FlowDatabaseOperation.deleteFlow(dbHandler, flowId)) {
+	if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
 	    datagridService.notificationSendFlowRemoved(flowId);
 	    return true;
 	}
@@ -584,7 +618,7 @@
      */
     @Override
     public boolean clearAllFlows() {
-	if (FlowDatabaseOperation.clearAllFlows(dbHandler)) {
+	if (FlowDatabaseOperation.clearAllFlows(dbHandlerApi)) {
 	    datagridService.notificationSendAllFlowsRemoved();
 	    return true;
 	}
@@ -599,7 +633,7 @@
      */
     @Override
     public boolean clearFlow(FlowId flowId) {
-	if (FlowDatabaseOperation.clearFlow(dbHandler, flowId)) {
+	if (FlowDatabaseOperation.clearFlow(dbHandlerApi, flowId)) {
 	    datagridService.notificationSendFlowRemoved(flowId);
 	    return true;
 	}
@@ -614,7 +648,7 @@
      */
     @Override
     public FlowPath getFlow(FlowId flowId) {
-	return FlowDatabaseOperation.getFlow(dbHandler, flowId);
+	return FlowDatabaseOperation.getFlow(dbHandlerApi, flowId);
     }
 
     /**
@@ -624,7 +658,7 @@
      */
     @Override
     public ArrayList<FlowPath> getAllFlows() {
-	return FlowDatabaseOperation.getAllFlows(dbHandler);
+	return FlowDatabaseOperation.getAllFlows(dbHandlerApi);
     }
 
     /**
@@ -638,7 +672,7 @@
     @Override
     public ArrayList<FlowPath> getAllFlows(CallerId installerId,
 					   DataPathEndpoints dataPathEndpoints) {
-	return FlowDatabaseOperation.getAllFlows(dbHandler, installerId,
+	return FlowDatabaseOperation.getAllFlows(dbHandlerApi, installerId,
 						 dataPathEndpoints);
     }
 
@@ -650,7 +684,8 @@
      */
     @Override
     public ArrayList<FlowPath> getAllFlows(DataPathEndpoints dataPathEndpoints) {
-	return FlowDatabaseOperation.getAllFlows(dbHandler, dataPathEndpoints);
+	return FlowDatabaseOperation.getAllFlows(dbHandlerApi,
+						 dataPathEndpoints);
     }
 
     /**
@@ -663,7 +698,7 @@
     @Override
     public ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId,
 						   int maxFlows) {
-	return FlowDatabaseOperation.getAllFlowsSummary(dbHandler, flowId,
+	return FlowDatabaseOperation.getAllFlowsSummary(dbHandlerApi, flowId,
 							maxFlows);
     }
     
@@ -673,7 +708,7 @@
      * @return all Flows information, without the associated Flow Entries.
      */
     public ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries() {
-	return FlowDatabaseOperation.getAllFlowsWithoutFlowEntries(dbHandler);
+	return FlowDatabaseOperation.getAllFlowsWithoutFlowEntries(dbHandlerApi);
     }
 
     /**
@@ -700,6 +735,24 @@
     }
 
     /**
+     * Get the collection of my switches.
+     *
+     * @return the collection of my switches.
+     */
+    public Map<Long, IOFSwitch> getMySwitches() {
+	return floodlightProvider.getSwitches();
+    }
+
+    /**
+     * Get the network topology.
+     *
+     * @return the network topology.
+     */
+    public Topology getTopology() {
+	return flowEventHandler.getTopology();
+    }
+
+    /**
      * Reconcile a flow.
      *
      * @param flowObj the flow that needs to be reconciliated.
@@ -707,6 +760,7 @@
      * @return true on success, otherwise false.
      */
     private boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
+	String flowIdStr = flowObj.getFlowId();
 
 	//
 	// Set the incoming port matching and the outgoing port output
@@ -714,6 +768,8 @@
 	//
 	int idx = 0;
 	for (FlowEntry flowEntry : newDataPath.flowEntries()) {
+	    flowEntry.setFlowId(new FlowId(flowIdStr));
+
 	    // Mark the Flow Entry as not updated in the switch
 	    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
 	    // Set the incoming port matching
@@ -788,6 +844,11 @@
     private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
 				    IFlowEntry flowEntryObj) {
     	return pusher.add(mySwitch, flowObj, flowEntryObj);
+    	
+//    	LEGACY
+//	return FlowSwitchOperation.installFlowEntry(
+//		floodlightProvider.getOFMessageFactory(),
+//		messageDamper, mySwitch, flowObj, flowEntryObj);
     }
 
     /**
@@ -801,6 +862,11 @@
     private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
 				    FlowEntry flowEntry) {
     	return pusher.add(mySwitch, flowPath, flowEntry);
+
+//    	LEGACY
+//	return FlowSwitchOperation.installFlowEntry(
+//		floodlightProvider.getOFMessageFactory(),
+//		messageDamper, mySwitch, flowPath, flowEntry);
     }
 
     /**
@@ -821,130 +887,217 @@
     }
 
     /**
-     * Push the modified Flow Entries of a collection of Flow Paths.
-     * Only the Flow Entries to switches controlled by this instance
+     * Push modified Flow Entries to switches.
+     *
+     * NOTE: Only the Flow Entries to switches controlled by this instance
      * are pushed.
      *
-     * NOTE: Currently, we write to both the Network MAP and the switches.
-     *
-     * @param modifiedFlowPaths the collection of Flow Paths with the modified
-     * Flow Entries.
+     * @param modifiedFlowEntries the collection of modified Flow Entries.
      */
-    public void pushModifiedFlowEntries(Collection<FlowPath> modifiedFlowPaths) {
-
+    public void pushModifiedFlowEntriesToSwitches(
+			Collection<FlowPathEntryPair> modifiedFlowEntries) {
 	// TODO: For now, the pushing of Flow Entries is disabled
-	if (true)
+	if (! enableNotifications)
 	    return;
 
-	Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
+	if (modifiedFlowEntries.isEmpty())
+	    return;
 
-	for (FlowPath flowPath : modifiedFlowPaths) {
-	    IFlowPath flowObj = dbHandler.searchFlowPath(flowPath.flowId());
-	    if (flowObj == null) {
-		String logMsg = "Cannot find Network MAP entry for Flow Path " +
-		    flowPath.flowId();
+	Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowPath flowPath = flowPair.flowPath;
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+	    if (mySwitch == null)
+		continue;
+
+	    log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+
+	    //
+	    // Install the Flow Entry into the switch
+	    //
+	    if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
+		String logMsg = "Cannot install Flow Entry " +
+		    flowEntry.flowEntryId() +
+		    " from Flow Path " + flowPath.flowId() +
+		    " on switch " + flowEntry.dpid();
 		log.error(logMsg);
 		continue;
 	    }
 
-	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
-		if (flowEntry.flowEntrySwitchState() !=
-		    FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
-		    continue;		// No need to update the entry
-		}
+	    //
+	    // NOTE: Here we assume that the switch has been
+	    // successfully updated.
+	    //
+	    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+	}
+    }
 
-		IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+    /**
+     * Push modified Flow Entries to the datagrid.
+     *
+     * @param modifiedFlowEntries the collection of modified Flow Entries.
+     */
+    public void pushModifiedFlowEntriesToDatagrid(
+			Collection<FlowPathEntryPair> modifiedFlowEntries) {
+	// TODO: For now, the pushing of Flow Entries is disabled
+	if (! enableNotifications)
+	    return;
+
+	if (modifiedFlowEntries.isEmpty())
+	    return;
+
+	Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+
+	    //
+	    // TODO: For now Flow Entries are removed by all instances,
+	    // even if this Flow Entry is not for our switches.
+	    //
+	    // This is needed to handle the case a switch going down:
+	    // it has no Master controller instance, hence no
+	    // controller instance will cleanup its flow entries.
+	    // This is sub-optimal: we need to elect a controller
+	    // instance to handle the cleanup of such orphaned flow
+	    // entries.
+	    //
+	    if (mySwitch == null) {
+		if (flowEntry.flowEntryUserState() !=
+		    FlowEntryUserState.FE_USER_DELETE) {
+		    continue;
+		}
+		if (! flowEntry.isValidFlowEntryId())
+		    continue;
+	    }
+
+	    log.debug("Pushing Flow Entry To Datagrid: {}", flowEntry.toString());
+	    //
+	    // Write the Flow Entry to the Datagrid
+	    //
+	    switch (flowEntry.flowEntryUserState()) {
+	    case FE_USER_ADD:
 		if (mySwitch == null)
-		    continue;		// Ignore the entry: not my switch
+		    break;	// Install only flow entries for my switches
+		datagridService.notificationSendFlowEntryAdded(flowEntry);
+		break;
+	    case FE_USER_MODIFY:
+		if (mySwitch == null)
+		    break;	// Install only flow entries for my switches
+		datagridService.notificationSendFlowEntryUpdated(flowEntry);
+		break;
+	    case FE_USER_DELETE:
+		datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+		break;
+	    }
+	}
+    }
 
-		//
-		// Assign the FlowEntry ID if needed
-		//
-		if ((flowEntry.flowEntryId() == null) ||
-		    (flowEntry.flowEntryId().value() == 0)) {
-		    long id = getNextFlowEntryId();
-		    flowEntry.setFlowEntryId(new FlowEntryId(id));
-		}
+    /**
+     * Push Flow Entries to the Network MAP.
+     *
+     * NOTE: The Flow Entries are pushed only on the instance responsible
+     * for the first switch. This is to avoid database errors when multiple
+     * instances are writing Flow Entries for the same Flow Path.
+     *
+     * @param modifiedFlowEntries the collection of Flow Entries to push.
+     */
+    public void pushModifiedFlowEntriesToDatabase(
+		Collection<FlowPathEntryPair> modifiedFlowEntries) {
+	// TODO: For now, the pushing of Flow Entries is disabled
+	if (! enableNotifications)
+	    return;
 
-		//
-		// Install the Flow Entry into the switch
-		//
-		if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
-		    String logMsg = "Cannot install Flow Entry " +
-			flowEntry.flowEntryId() +
-			" from Flow Path " + flowPath.flowId() +
-			" on switch " + flowEntry.dpid();
-		    log.error(logMsg);
-		    continue;
-		}
+	if (modifiedFlowEntries.isEmpty())
+	    return;
 
-		//
-		// NOTE: Here we assume that the switch has been successfully
-		// updated.
-		//
-		flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+	Map<Long, IOFSwitch> mySwitches = getMySwitches();
 
-		//
-		// Write the Flow Entry to the Network Map
-		//
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowPath flowPath = flowPair.flowPath;
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    if (! flowEntry.isValidFlowEntryId())
+		continue;
+
+	    //
+	    // Push the changes only on the instance responsible for the
+	    // first switch.
+	    //
+	    Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+	    IOFSwitch mySrcSwitch = mySwitches.get(srcDpid.value());
+	    if (mySrcSwitch == null)
+		continue;
+
+	    log.debug("Pushing Flow Entry To Database: {}", flowEntry.toString());
+	    //
+	    // Write the Flow Entry to the Network Map
+	    //
+	    // NOTE: We try a number of times, in case somehow some other
+	    // instances are writing at the same time.
+	    // Apparently, if other instances are writing at the same time
+	    // this will trigger an error.
+	    //
+	    for (int i = 0; i < 6; i++) {
 		try {
-		    if (addFlowEntry(flowObj, flowEntry) == null) {
-			String logMsg = "Cannot write to Network MAP Flow Entry " +
-			    flowEntry.flowEntryId() +
-			    " from Flow Path " + flowPath.flowId() +
-			    " on switch " + flowEntry.dpid();
+		    //
+		    // Find the Flow Path in the Network MAP.
+		    //
+		    // NOTE: The Flow Path might not be found if the Flow was
+		    // just removed by some other controller instance.
+		    //
+		    IFlowPath flowObj =
+			dbHandlerInner.searchFlowPath(flowEntry.flowId());
+		    if (flowObj == null) {
+			String logMsg = "Cannot find Network MAP entry for Flow Path " + flowEntry.flowId();
 			log.error(logMsg);
-			continue;
+			break;
 		    }
+
+		    // Write the Flow Entry
+		    switch (flowEntry.flowEntryUserState()) {
+		    case FE_USER_ADD:
+			// FALLTHROUGH
+		    case FE_USER_MODIFY:
+			if (addFlowEntry(flowObj, flowEntry) == null) {
+			    String logMsg = "Cannot write to Network MAP Flow Entry " +
+				flowEntry.flowEntryId() +
+				" from Flow Path " + flowEntry.flowId() +
+				" on switch " + flowEntry.dpid();
+			    log.error(logMsg);
+			}
+			break;
+		    case FE_USER_DELETE:
+			if (deleteFlowEntry(flowObj, flowEntry) == false) {
+			    String logMsg = "Cannot remove from Network MAP Flow Entry " +
+				flowEntry.flowEntryId() +
+				" from Flow Path " + flowEntry.flowId() +
+				" on switch " + flowEntry.dpid();
+			    log.error(logMsg);
+			}
+			break;
+		    }
+
+		    // Commit to the database
+		    dbHandlerInner.commit();
+		    break;	// Success
+
 		} catch (Exception e) {
-		    String logMsg = "Exception writing Flow Entry to Network MAP";
-		    log.debug(logMsg);
-		    dbHandler.rollback();
-		    continue;
+		    log.debug("Exception writing Flow Entry to Network MAP: ", e);
+		    dbHandlerInner.rollback();
+		    // Wait a bit (random value [1ms, 20ms] and try again
+		    int delay = 1 + randomGenerator.nextInt() % 20;
+		    try {
+			Thread.sleep(delay);
+		    } catch (Exception e0) {
+		    }
 		}
 	    }
 	}
-
-	dbHandler.commit();
     }
-
-	@Override
-	public void addMessage(long dpid, OFMessage msg) {
-		IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
-		if (sw == null) {
-			return;
-		}
-		
-		pusher.add(sw, msg);
-	}
-
-	@Override
-	public boolean suspend(long dpid) {
-		IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
-		if (sw == null) {
-			return false;
-		}
-		
-		return pusher.suspend(sw);
-	}
-
-	@Override
-	public boolean resume(long dpid) {
-		IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
-		if (sw == null) {
-			return false;
-		}
-		
-		return pusher.resume(sw);
-	}
-
-	@Override
-	public boolean isSuspended(long dpid) {
-		IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
-		if (sw == null) {
-			return false;
-		}
-		
-		return pusher.isSuspended(sw);
-	}
 }