Merge pull request #449 from jonohart/master

Beginnings of the multi-instance ARP module
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index d04e50a..d9fb7c3 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -82,7 +82,7 @@
      *
      * The datagrid map is:
      *  - Key : Flow ID (Long)
-     *  - Value : Serialized Flow (byte[])
+     *  - Value : Serialized FlowPath (byte[])
      */
     class MapFlowListener implements EntryListener<Long, byte[]> {
 	/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/ILinkStorage.java b/src/main/java/net/onrc/onos/ofcontroller/core/ILinkStorage.java
index c8312d4..8889092 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/ILinkStorage.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/ILinkStorage.java
@@ -44,6 +44,14 @@
 	 */
 	public List<Link> getLinks(Long dpid, short port);
 
+	/**
+	 * Get list of all reverse links connected to the port specified by given DPID and port number.
+	 * @param dpid DPID of desired port.
+	 * @param port Port number of desired port.
+	 * @return List of reverse links. Empty list if no port was found.
+	 */
+	public List<Link> getReverseLinks(Long dpid, short port);
+
 	public List<Link> getLinks(String dpid);
 
 	/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/ISwitchStorage.java b/src/main/java/net/onrc/onos/ofcontroller/core/ISwitchStorage.java
index b7825f9..1c243c0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/ISwitchStorage.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/ISwitchStorage.java
@@ -1,5 +1,7 @@
 package net.onrc.onos.ofcontroller.core;
 
+import java.util.List;
+
 import net.floodlightcontroller.core.IOFSwitch;
 
 import org.openflow.protocol.OFPhysicalPort;
@@ -43,4 +45,12 @@
 	 * Delete a port on a switch by num
 	 */
 	public boolean deletePort(String dpid, short port);
+
+	/**
+	 * Get list of all ports on the switch specified by given DPID.
+	 *
+	 * @param dpid DPID of desired switch.
+	 * @return List of port IDs. Empty list if no port was found.
+	 */
+	public List<Short> getPorts(String dpid);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
index 09e87ca..7a3d43e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
@@ -131,6 +131,7 @@
 				op.rollback();
 			}
 		} catch (Exception e) {
+			op.rollback();
 			e.printStackTrace();
 			log.error("LinkStorageImpl:addLink link:{} linfo:{} failed", link, linfo);
 		}
@@ -157,6 +158,7 @@
 			op.commit();
 			success = true;
 		} catch (Exception e) {
+			op.rollback();
 			e.printStackTrace();
 			log.error("LinkStorageImpl:addLinks link:s{} failed", links);
 		}
@@ -227,24 +229,54 @@
 	 */
 	@Override
 	public List<Link> getLinks(Long dpid, short port) {
-    	List<Link> links = new ArrayList<Link>();
+	    List<Link> links = new ArrayList<Link>();
+
+	    IPortObject srcPort = op.searchPort(HexString.toHexString(dpid), port);
+	    if (srcPort == null)
+		return links;
+	    ISwitchObject srcSw = srcPort.getSwitch();
+	    if (srcSw == null)
+		return links;
     	
-    	IPortObject srcPort = op.searchPort(HexString.toHexString(dpid), port);
-    	ISwitchObject srcSw = srcPort.getSwitch();
+	    for(IPortObject dstPort : srcPort.getLinkedPorts()) {
+		ISwitchObject dstSw = dstPort.getSwitch();
+		if (dstSw != null) {
+		    Link link = new Link(dpid, port,
+					 HexString.toLong(dstSw.getDPID()),
+					 dstPort.getNumber());
+		    links.add(link);
+		}
+	    }
+	    return links;
+	}
+
+	/**
+	 * Get list of all reverse links connected to the port specified by given DPID and port number.
+	 * @param dpid DPID of desired port.
+	 * @param port Port number of desired port.
+	 * @return List of reverse links. Empty list if no port was found.
+	 */
+	@Override
+	public List<Link> getReverseLinks(Long dpid, short port) {
+	    List<Link> links = new ArrayList<Link>();
     	
-    	if(srcSw != null && srcPort != null) {
-        	for(IPortObject dstPort : srcPort.getLinkedPorts()) {
-        		ISwitchObject dstSw = dstPort.getSwitch();
-        		Link link = new Link(HexString.toLong(srcSw.getDPID()),
-        				srcPort.getNumber(),
-        				HexString.toLong(dstSw.getDPID()),
-        				dstPort.getNumber());
-    		
-        		links.add(link);
-        	}
-    	}
+	    IPortObject srcPort = op.searchPort(HexString.toHexString(dpid), port);
+	    if (srcPort == null)
+		return links;
+	    ISwitchObject srcSw = srcPort.getSwitch();
+	    if (srcSw == null)
+		return links;
     	
-     	return links;
+	    for(IPortObject dstPort : srcPort.getReverseLinkedPorts()) {
+		ISwitchObject dstSw = dstPort.getSwitch();
+		if (dstSw != null) {
+		    Link link = new Link(HexString.toLong(dstSw.getDPID()),
+					 dstPort.getNumber(),
+					 dpid, port);
+		    links.add(link);
+		}
+	    }
+	    return links;
 	}
 	
 	/**
@@ -292,10 +324,10 @@
 				for(IPortObject dstPort : srcPort.getLinkedPorts()) {
 					ISwitchObject dstSw = dstPort.getSwitch();
 					if(dstSw != null) {
-		        		Link link = new Link(HexString.toLong(srcSw.getDPID()),
+					    Link link = new Link(HexString.toLong(dpid),
 		        				srcPort.getNumber(),
 		        				HexString.toLong(dstSw.getDPID()),
-		        				dstPort.getNumber());
+							dstPort.getNumber());
 		        		links.add(link);
 					}
 				}
@@ -326,7 +358,7 @@
 							HexString.toLong(dstSw.getDPID()),
 							dstPort.getNumber(),
 					
-							HexString.toLong(srcSw.getDPID()),
+							HexString.toLong(dpid),
 							srcPort.getNumber());
 		        		links.add(link);
 					}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
index c01a3cb..7f0c259 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
@@ -1,5 +1,8 @@
 package net.onrc.onos.ofcontroller.core.internal;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import net.floodlightcontroller.core.IOFSwitch;
 import net.onrc.onos.graph.GraphDBConnection;
 import net.onrc.onos.graph.GraphDBOperation;
@@ -229,8 +232,8 @@
 	        	log.info("SwitchStorage:updatePort dpid:{} port:{}", dpid, portNum);
 	        	if (p != null) {
 	        		setPortStateImpl(p, state, desc);
+				op.commit();
 	        	}
-	        	op.commit();
         		success = true;
 	        } else {
 	    		log.error("SwitchStorage:updatePort dpid:{} port:{} : failed switch does not exist", dpid, portNum);
@@ -316,6 +319,26 @@
 		return success;
 	}
 
+	/**
+	 * Get list of all ports on the switch specified by given DPID.
+	 *
+	 * @param dpid DPID of desired switch.
+	 * @return List of port IDs. Empty list if no port was found.
+	 */
+	@Override
+	public List<Short> getPorts(String dpid) {
+	    List<Short> ports = new ArrayList<Short>();
+
+	    ISwitchObject srcSw = op.searchSwitch(dpid);
+	    if (srcSw != null) {
+		for (IPortObject srcPort : srcSw.getPorts()) {
+		    ports.add(srcPort.getNumber());
+		}
+	    }
+
+	    return ports;
+	}
+
 	private ISwitchObject addSwitchImpl(String dpid) {
 		if (dpid != null) {
 			ISwitchObject sw = op.newSwitch(dpid);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
index 26292d9..648f6f1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
@@ -100,6 +100,14 @@
 			if (hasControl) {
 				log.debug("got control to set inactive sw {}", HexString.toHexString(dpid));
 				try {
+					// Get the affected ports
+					List<Short> ports = swStore.getPorts(HexString.toHexString(dpid));
+					// Get the affected links
+					List<Link> links = linkStore.getLinks(HexString.toHexString(dpid));
+					// Get the affected reverse links
+					List<Link> reverseLinks = linkStore.getReverseLinks(HexString.toHexString(dpid));
+					links.addAll(reverseLinks);
+
 					if (swStore.updateSwitch(HexString.toHexString(dpid), SwitchState.INACTIVE, DM_OPERATION.UPDATE)) {
 					    registryService.releaseControl(dpid);
 					    
@@ -113,6 +121,21 @@
 						new TopologyElement(dpid);
 					    datagridService.notificationSendTopologyElementRemoved(topologyElement);
 
+					    // Publish: remove the affected ports
+					    for (Short port : ports) {
+						TopologyElement topologyElementPort =
+						    new TopologyElement(dpid, port);
+						datagridService.notificationSendTopologyElementRemoved(topologyElementPort);
+					    }
+					    // Publish: remove the affected links
+					    for (Link link : links) {
+						TopologyElement topologyElementLink =
+						    new TopologyElement(link.getSrc(),
+									link.getSrcPort(),
+									link.getDst(),
+									link.getDstPort());
+						datagridService.notificationSendTopologyElementRemoved(topologyElementLink);
+					    }
 					}
 				} catch (Exception e) {
 	                log.error("Error in SwitchCleanup:controlChanged ", e);
@@ -214,7 +237,8 @@
 			    TopologyElement topologyElement =
 				new TopologyElement(sw.getId());
 			    datagridService.notificationSendTopologyElementAdded(topologyElement);
-			    // Add the ports
+
+			    // Publish: add the ports
 			    // TODO: Add only ports that are UP?
 			    for (OFPhysicalPort port : sw.getPorts()) {
 				TopologyElement topologyElementPort =
@@ -228,6 +252,8 @@
 			    // Add all reverse links as well
 			    List<Link> reverseLinks = linkStore.getReverseLinks(HexString.toHexString(sw.getId()));
 			    links.addAll(reverseLinks);
+
+			    // Publish: add the links
 			    for (Link link : links) {
 				TopologyElement topologyElementLink =
 				    new TopologyElement(link.getSrc(),
@@ -243,11 +269,35 @@
 	@Override
 	public void removedSwitch(IOFSwitch sw) {
 		if (registryService.hasControl(sw.getId())) {
+			// Get the affected ports
+			List<Short> ports = swStore.getPorts(HexString.toHexString(sw.getId()));
+			// Get the affected links
+			List<Link> links = linkStore.getLinks(HexString.toHexString(sw.getId()));
+			// Get the affected reverse links
+			List<Link> reverseLinks = linkStore.getReverseLinks(HexString.toHexString(sw.getId()));
+			links.addAll(reverseLinks);
+
 			if (swStore.deleteSwitch(sw.getStringId())) {
 			    // TODO publish DELETE_SWITCH event here
 			    TopologyElement topologyElement =
 				new TopologyElement(sw.getId());
 			    datagridService.notificationSendTopologyElementRemoved(topologyElement);
+
+			    // Publish: remove the affected ports
+			    for (Short port : ports) {
+				TopologyElement topologyElementPort =
+				    new TopologyElement(sw.getId(), port);
+				datagridService.notificationSendTopologyElementRemoved(topologyElementPort);
+			    }
+			    // Publish: remove the affected links
+			    for (Link link : links) {
+				TopologyElement topologyElementLink =
+				    new TopologyElement(link.getSrc(),
+							link.getSrcPort(),
+							link.getDst(),
+							link.getDstPort());
+				datagridService.notificationSendTopologyElementRemoved(topologyElementLink);
+			    }
 			}
 		}
 	}
@@ -265,16 +315,48 @@
 		    TopologyElement topologyElement =
 			new TopologyElement(switchId, port.getPortNumber());
 		    datagridService.notificationSendTopologyElementAdded(topologyElement);
+
+		    // Add all links that might be connected already
+		    List<Link> links = linkStore.getLinks(switchId, port.getPortNumber());
+		    // Add all reverse links as well
+		    List<Link> reverseLinks = linkStore.getReverseLinks(switchId, port.getPortNumber());
+		    links.addAll(reverseLinks);
+
+		    // Publish: add the links
+		    for (Link link : links) {
+			TopologyElement topologyElementLink =
+			    new TopologyElement(link.getSrc(),
+						link.getSrcPort(),
+						link.getDst(),
+						link.getDstPort());
+			datagridService.notificationSendTopologyElementAdded(topologyElementLink);
+		    }
 		}
 	}
 
 	@Override
 	public void switchPortRemoved(Long switchId, OFPhysicalPort port) {
+		// Remove all links that might be connected already
+		List<Link> links = linkStore.getLinks(switchId, port.getPortNumber());
+		// Remove all reverse links as well
+		List<Link> reverseLinks = linkStore.getReverseLinks(switchId, port.getPortNumber());
+		links.addAll(reverseLinks);
+
 		if (swStore.deletePort(HexString.toHexString(switchId), port.getPortNumber())) {
 		    // TODO publish DELETE_PORT event here
 		    TopologyElement topologyElement =
 			new TopologyElement(switchId, port.getPortNumber());
 		    datagridService.notificationSendTopologyElementRemoved(topologyElement);
+
+		    // Publish: remove the links
+		    for (Link link : links) {
+			TopologyElement topologyElementLink =
+			    new TopologyElement(link.getSrc(),
+						link.getSrcPort(),
+						link.getDst(),
+						link.getDstPort());
+			datagridService.notificationSendTopologyElementRemoved(topologyElementLink);
+		    }
 		}
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index cb1e678..0e9887a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -10,6 +10,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import net.floodlightcontroller.core.IOFSwitch;
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
@@ -31,6 +32,19 @@
 import org.slf4j.LoggerFactory;
 
 /**
+ * A class for storing a pair of Flow Path and a Flow Entry.
+ */
+class FlowPathEntryPair {
+    protected FlowPath flowPath;
+    protected FlowEntry flowEntry;
+
+    protected FlowPathEntryPair(FlowPath flowPath, FlowEntry flowEntry) {
+	this.flowPath = flowPath;
+	this.flowEntry = flowEntry;
+    }
+}
+
+/**
  * Class for FlowPath Maintenance.
  * This class listens for FlowEvents to:
  * - Maintain a local cache of the Network Topology.
@@ -45,8 +59,8 @@
     private IDatagridService datagridService;	// The Datagrid Service to use
     private Topology topology;			// The network topology
     private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
-    private List<FlowEntry> unmatchedFlowEntryUpdates =
-	new LinkedList<FlowEntry>();
+    private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
+	new HashMap<Long, FlowEntry>();
 
     // The queue with Flow Path and Topology Element updates
     private BlockingQueue<EventEntry<?>> networkEvents =
@@ -60,6 +74,22 @@
     private List<EventEntry<FlowEntry>> flowEntryEvents =
 	new LinkedList<EventEntry<FlowEntry>>();
 
+    //
+    // Transient state for processing the Flow Paths:
+    //  - The new Flow Paths
+    //  - The Flow Paths that should be recomputed
+    //  - The Flow Paths with modified Flow Entries
+    //  - The Flow Entries that were updated
+    //
+    private List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
+    private List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
+    private List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+    private List<FlowPathEntryPair> updatedFlowEntries =
+	new LinkedList<FlowPathEntryPair>();
+    private List<FlowPathEntryPair> unmatchedDeleteFlowEntries =
+	new LinkedList<FlowPathEntryPair>();
+
+
     /**
      * Constructor for a given Flow Manager and Datagrid Service.
      *
@@ -81,10 +111,9 @@
     protected Topology getTopology() { return this.topology; }
 
     /**
-     * Run the thread.
+     * Startup processing.
      */
-    @Override
-    public void run() {
+    private void startup() {
 	//
 	// Obtain the initial Topology state
 	//
@@ -116,6 +145,14 @@
 
 	// Process the initial events (if any)
 	processEvents();
+    }
+
+    /**
+     * Run the thread.
+     */
+    @Override
+    public void run() {
+	startup();
 
 	//
 	// The main loop
@@ -162,17 +199,116 @@
      * Process the events (if any)
      */
     private void processEvents() {
-	List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
-	List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
-	List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+	List<FlowPathEntryPair> modifiedFlowEntries;
 
 	if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
 	    flowEntryEvents.isEmpty()) {
 	    return;		// Nothing to do
 	}
 
+	processFlowPathEvents();
+	processTopologyEvents();
 	//
-	// Process the Flow Path events
+	// Add all new Flows: should be done after processing the Flow Path
+	// and Topology events.
+	//
+	for (FlowPath flowPath : newFlowPaths) {
+	    allFlowPaths.put(flowPath.flowId().value(), flowPath);
+	}
+
+	processFlowEntryEvents();
+
+	// Recompute all affected Flow Paths and keep only the modified
+	for (FlowPath flowPath : recomputeFlowPaths) {
+	    if (recomputeFlowPath(flowPath))
+		modifiedFlowPaths.add(flowPath);
+	}
+
+	modifiedFlowEntries = extractModifiedFlowEntries(modifiedFlowPaths);
+
+	// Assign missing Flow Entry IDs
+	assignFlowEntryId(modifiedFlowEntries);
+
+	//
+	// Push the modified Flow Entries to switches, datagrid and database
+	//
+	flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
+	flowManager.pushModifiedFlowEntriesToDatagrid(modifiedFlowEntries);
+	flowManager.pushModifiedFlowEntriesToDatabase(modifiedFlowEntries);
+	flowManager.pushModifiedFlowEntriesToDatabase(updatedFlowEntries);
+	flowManager.pushModifiedFlowEntriesToDatabase(unmatchedDeleteFlowEntries);
+
+	//
+	// Remove Flow Entries that were deleted
+	//
+	for (FlowPath flowPath : modifiedFlowPaths)
+	    flowPath.dataPath().removeDeletedFlowEntries();
+
+	// Cleanup
+	topologyEvents.clear();
+	flowPathEvents.clear();
+	flowEntryEvents.clear();
+	//
+	newFlowPaths.clear();
+	recomputeFlowPaths.clear();
+	modifiedFlowPaths.clear();
+	updatedFlowEntries.clear();
+	unmatchedDeleteFlowEntries.clear();
+    }
+
+    /**
+     * Extract the modified Flow Entries.
+     */
+    private List<FlowPathEntryPair> extractModifiedFlowEntries(
+					List<FlowPath> modifiedFlowPaths) {
+	List<FlowPathEntryPair> modifiedFlowEntries =
+	    new LinkedList<FlowPathEntryPair>();
+
+	// Extract only the modified Flow Entries
+	for (FlowPath flowPath : modifiedFlowPaths) {
+	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		if (flowEntry.flowEntrySwitchState() ==
+		    FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+		    FlowPathEntryPair flowPair =
+			new FlowPathEntryPair(flowPath, flowEntry);
+		    modifiedFlowEntries.add(flowPair);
+		}
+	    }
+	}
+	return modifiedFlowEntries;
+    }
+
+    /**
+     * Assign the Flow Entry ID as needed.
+     */
+    private void assignFlowEntryId(List<FlowPathEntryPair> modifiedFlowEntries) {
+	if (modifiedFlowEntries.isEmpty())
+	    return;
+
+	Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+	//
+	// Assign the Flow Entry ID only for Flow Entries for my switches
+	//
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowEntry flowEntry = flowPair.flowEntry;
+	    // Update the Flow Entries only for my switches
+	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+	    if (mySwitch == null)
+		continue;
+	    if (! flowEntry.isValidFlowEntryId()) {
+		long id = flowManager.getNextFlowEntryId();
+		flowEntry.setFlowEntryId(new FlowEntryId(id));
+	    }
+	}
+    }
+
+    /**
+     * Process the Flow Path events.
+     */
+    private void processFlowPathEvents() {
+	//
+	// Process all Flow Path events and update the appropriate state
 	//
 	for (EventEntry<FlowPath> eventEntry : flowPathEvents) {
 	    FlowPath flowPath = eventEntry.eventData();
@@ -243,9 +379,14 @@
 	    }
 	    }
 	}
+    }
 
+    /**
+     * Process the Topology events.
+     */
+    private void processTopologyEvents() {
 	//
-	// Process the topology events
+	// Process all Topology events and update the appropriate state
 	//
 	boolean isTopologyModified = false;
 	for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
@@ -267,97 +408,114 @@
 	    // TODO: For now, if the topology changes, we recompute all Flows
 	    recomputeFlowPaths.addAll(allFlowPaths.values());
 	}
-
-	// Add all new Flows
-	for (FlowPath flowPath : newFlowPaths) {
-	    allFlowPaths.put(flowPath.flowId().value(), flowPath);
-	}
-
-	// Recompute all affected Flow Paths and keep only the modified
-	for (FlowPath flowPath : recomputeFlowPaths) {
-	    if (recomputeFlowPath(flowPath))
-		modifiedFlowPaths.add(flowPath);
-	}
-
-	//
-	// Process previously unmatched Flow Entry updates
-	//
-	if ((! flowPathEvents.isEmpty()) && (! unmatchedFlowEntryUpdates.isEmpty())) {
-	    List<FlowEntry> remainingUpdates = new LinkedList<FlowEntry>();
-	    for (FlowEntry flowEntry : unmatchedFlowEntryUpdates) {
-		if (! updateFlowEntry(flowEntry))
-		    remainingUpdates.add(flowEntry);
-	    }
-	    unmatchedFlowEntryUpdates = remainingUpdates;
-	}
-
-	//
-	// Process the Flow Entry events
-	//
-	for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
-	    FlowEntry flowEntry = eventEntry.eventData();
-	    switch (eventEntry.eventType()) {
-	    case ENTRY_ADD:
-		//
-		// Find the corresponding Flow Entry and update it.
-		// If not found, then keep it in a local cache for
-		// later matching.
-		//
-		if (! updateFlowEntry(flowEntry))
-		    unmatchedFlowEntryUpdates.add(flowEntry);
-		break;
-	    case ENTRY_REMOVE:
-		//
-		// NOTE: For now we remove the Flow Entries based on
-		// local decisions, so no need to remove them because of an
-		// external event.
-		//
-		break;
-	    }
-	}
-
-	//
-	// Push the Flow Entries that have been modified
-	//
-	flowManager.pushModifiedFlowEntries(modifiedFlowPaths);
-
-	// Cleanup
-	topologyEvents.clear();
-	flowPathEvents.clear();
-	flowEntryEvents.clear();
     }
 
     /**
-     * Update a Flow Entry because of an external event.
-     *
-     * @param flowEntry the FlowEntry with the new state.
-     * @return true if the Flow Entry was found and updated, otherwise false.
+     * Process the Flow Entry events.
      */
-    private boolean updateFlowEntry(FlowEntry flowEntry) {
-	if ((! flowEntry.isValidFlowId()) ||
-	    (! flowEntry.isValidFlowEntryId())) {
-	    //
-	    // Ignore events for Flow Entries with invalid Flow ID or
-	    // Flow Entry ID.
-	    // This shouldn't happen.
-	    //
-	    return true;
+    private void processFlowEntryEvents() {
+	FlowPathEntryPair flowPair;
+	FlowPath flowPath;
+	FlowEntry updatedFlowEntry;
+
+	//
+	// Update Flow Entries with previously unmatched Flow Entry updates
+	//
+	if (! unmatchedFlowEntryAdd.isEmpty()) {
+	    Map<Long, FlowEntry> remainingUpdates = new HashMap<Long, FlowEntry>();
+	    for (FlowEntry flowEntry : unmatchedFlowEntryAdd.values()) {
+		flowPath = allFlowPaths.get(flowEntry.flowId().value());
+		if (flowPath == null)
+		    continue;
+		updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+		if (updatedFlowEntry == null) {
+		    remainingUpdates.put(flowEntry.flowEntryId().value(),
+					 flowEntry);
+		    continue;
+		}
+		flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+		updatedFlowEntries.add(flowPair);
+	    }
+	    unmatchedFlowEntryAdd = remainingUpdates;
 	}
 
-	FlowPath flowPath = allFlowPaths.get(flowEntry.flowId().value());
-	if (flowPath == null)
-	    return false;
-
 	//
-	// Iterate over all Flow Entries and find a match based on the DPID
+	// Process all Flow Entry events and update the appropriate state
+	//
+	for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
+	    FlowEntry flowEntry = eventEntry.eventData();
+
+	    log.debug("Flow Entry Event: {} {}", eventEntry.eventType(),
+		      flowEntry.toString());
+
+	    if ((! flowEntry.isValidFlowId()) ||
+		(! flowEntry.isValidFlowEntryId())) {
+		continue;
+	    }
+
+	    switch (eventEntry.eventType()) {
+	    case ENTRY_ADD:
+		flowPath = allFlowPaths.get(flowEntry.flowId().value());
+		if (flowPath == null) {
+		    // Flow Path not found: keep the entry for later matching
+		    unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+					      flowEntry);
+		    break;
+		}
+		updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+		if (updatedFlowEntry == null) {
+		    // Flow Entry not found: keep the entry for later matching
+		    unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+					      flowEntry);
+		    break;
+		}
+		// Add the updated entry to the list of updated Flow Entries
+		flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+		updatedFlowEntries.add(flowPair);
+		break;
+
+	    case ENTRY_REMOVE:
+		flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+		if (unmatchedFlowEntryAdd.remove(flowEntry.flowEntryId().value()) != null) {
+		    continue;		// Match found
+		}
+						 
+		flowPath = allFlowPaths.get(flowEntry.flowId().value());
+		if (flowPath == null) {
+		    // Flow Path not found: ignore the update
+		    break;
+		}
+		updatedFlowEntry = updateFlowEntryRemove(flowPath, flowEntry);
+		if (updatedFlowEntry == null) {
+		    // Flow Entry not found: add to list of deleted entries
+		    flowPair = new FlowPathEntryPair(flowPath, flowEntry);
+		    unmatchedDeleteFlowEntries.add(flowPair);
+		    break;
+		}
+		flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+		updatedFlowEntries.add(flowPair);
+		break;
+	    }
+	}
+    }
+
+    /**
+     * Update a Flow Entry because of an external ENTRY_ADD event.
+     *
+     * @param flowPath the FlowPath for the Flow Entry to update.
+     * @param flowEntry the FlowEntry with the new state.
+     * @return the updated Flow Entry if found, otherwise null.
+     */
+    private FlowEntry updateFlowEntryAdd(FlowPath flowPath,
+					 FlowEntry flowEntry) {
+	//
+	// Iterate over all Flow Entries and find a match.
 	//
 	for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
-	    if (localFlowEntry.dpid().value() != flowEntry.dpid().value())
+	    if (! TopologyManager.isSameFlowEntryDataPath(localFlowEntry,
+							  flowEntry)) {
 		continue;
-	    //
-	    // TODO: We might want to check the FlowEntryMatch and
-	    // FlowEntryActions to double-check it is the same Flow Entry
-	    //
+	    }
 
 	    //
 	    // Local Flow Entry match found
@@ -367,9 +525,9 @@
 		    flowEntry.flowEntryId().value()) {
 		    //
 		    // Find a local Flow Entry, but the Flow Entry ID doesn't
-		    // match. Ignore the event.
+		    // match. Keep looking.
 		    //
-		    return true;
+		    continue;
 		}
 	    } else {
 		// Update the Flow Entry ID
@@ -380,13 +538,44 @@
 
 	    //
 	    // Update the local Flow Entry.
-	    // For now we update only the Flow Entry Switch State
 	    //
+	    localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
 	    localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
-	    return true;
+	    return localFlowEntry;
 	}
 
-	return false;		// Entry not found
+	return null;		// Entry not found
+    }
+
+    /**
+     * Update a Flow Entry because of an external ENTRY_REMOVE event.
+     *
+     * @param flowPath the FlowPath for the Flow Entry to update.
+     * @param flowEntry the FlowEntry with the new state.
+     * @return the updated Flow Entry if found, otherwise null.
+     */
+    private FlowEntry updateFlowEntryRemove(FlowPath flowPath,
+					    FlowEntry flowEntry) {
+	//
+	// Iterate over all Flow Entries and find a match based on
+	// the Flow Entry ID.
+	//
+	for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
+	    if (! localFlowEntry.isValidFlowEntryId())
+		continue;
+	    if (localFlowEntry.flowEntryId().value() !=
+		flowEntry.flowEntryId().value()) {
+		    continue;
+	    }
+	    //
+	    // Update the local Flow Entry.
+	    //
+	    localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
+	    localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
+	    return localFlowEntry;
+	}
+
+	return null;		// Entry not found
     }
 
     /**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index c22b916..e239ae9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -41,7 +41,14 @@
  */
 public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
 
-    protected GraphDBOperation dbHandler;
+    //
+    // TODO: A temporary variable to switch between the poll-based and
+    // notification mechanism for the Flow Manager.
+    //
+    private final static boolean enableNotifications = false;
+
+    protected GraphDBOperation dbHandlerApi;
+    protected GraphDBOperation dbHandlerInner;
 
     protected volatile IFloodlightProviderService floodlightProvider;
     protected volatile ITopologyNetService topologyNetService;
@@ -82,7 +89,7 @@
 		    runImpl();
 		} catch (Exception e) {
 		    log.debug("Exception processing All Flow Entries from the Network MAP: ", e);
-		    dbHandler.rollback();
+		    dbHandlerInner.rollback();
 		    return;
 		}
 	    }
@@ -113,7 +120,7 @@
 		// switches.
 		//
 		Iterable<IFlowEntry> allFlowEntries =
-		    dbHandler.getAllSwitchNotUpdatedFlowEntries();
+		    dbHandlerInner.getAllSwitchNotUpdatedFlowEntries();
 		for (IFlowEntry flowEntryObj : allFlowEntries) {
 		    counterAllFlowEntries++;
 
@@ -126,7 +133,7 @@
 			continue;	// Ignore the entry: not my switch
 
 		    IFlowPath flowObj =
-			dbHandler.getFlowPathByFlowEntry(flowEntryObj);
+			dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
 		    if (flowObj == null)
 			continue;		// Should NOT happen
 		    if (flowObj.getFlowId() == null)
@@ -155,7 +162,7 @@
 		//
 		for (IFlowEntry flowEntryObj : addFlowEntries) {
 		    IFlowPath flowObj =
-			dbHandler.getFlowPathByFlowEntry(flowEntryObj);
+			dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
 		    if (flowObj == null)
 			continue;		// Should NOT happen
 		    if (flowObj.getFlowId() == null)
@@ -179,16 +186,16 @@
 		while (! deleteFlowEntries.isEmpty()) {
 		    IFlowEntry flowEntryObj = deleteFlowEntries.poll();
 		    IFlowPath flowObj =
-			dbHandler.getFlowPathByFlowEntry(flowEntryObj);
+			dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
 		    if (flowObj == null) {
 			log.debug("Did not find FlowPath to be deleted");
 			continue;
 		    }
 		    flowObj.removeFlowEntry(flowEntryObj);
-		    dbHandler.removeFlowEntry(flowEntryObj);
+		    dbHandlerInner.removeFlowEntry(flowEntryObj);
 		}
 
-		dbHandler.commit();
+		dbHandlerInner.commit();
 
 		long estimatedTime = System.nanoTime() - startTime;
 		double rate = 0.0;
@@ -213,7 +220,7 @@
 		    runImpl();
 		} catch (Exception e) {
 		    log.debug("Exception processing All Flows from the Network MAP: ", e);
-		    dbHandler.rollback();
+		    dbHandlerInner.rollback();
 		    return;
 		}
 	    }
@@ -240,7 +247,7 @@
 		// Flow Paths this controller is responsible for.
 		//
 		Topology topology = topologyNetService.newDatabaseTopology();
-		Iterable<IFlowPath> allFlowPaths = dbHandler.getAllFlowPaths();
+		Iterable<IFlowPath> allFlowPaths = dbHandlerInner.getAllFlowPaths();
 		for (IFlowPath flowPathObj : allFlowPaths) {
 		    counterAllFlowPaths++;
 		    if (flowPathObj == null)
@@ -342,12 +349,12 @@
 		//
 		while (! deleteFlows.isEmpty()) {
 		    IFlowPath flowPathObj = deleteFlows.poll();
-		    dbHandler.removeFlowPath(flowPathObj);
+		    dbHandlerInner.removeFlowPath(flowPathObj);
 		}
 
 		topologyNetService.dropTopology(topology);
 
-		dbHandler.commit();
+		dbHandlerInner.commit();
 
 		long estimatedTime = System.nanoTime() - startTime;
 		double rate = 0.0;
@@ -370,7 +377,8 @@
      */
     @Override
     public void init(String conf) {
-    	dbHandler = new GraphDBOperation(conf);
+    	dbHandlerApi = new GraphDBOperation(conf);
+    	dbHandlerInner = new GraphDBOperation(conf);
     }
 
     /**
@@ -386,7 +394,8 @@
     @Override
     public void close() {
 	datagridService.deregisterFlowEventHandlerService(flowEventHandler);
-    	dbHandler.close();
+    	dbHandlerApi.close();
+    	dbHandlerInner.close();
     }
 
     /**
@@ -503,10 +512,12 @@
 
 	// Schedule the threads and periodic tasks
 	flowEventHandler.start();
-	mapReaderScheduler.scheduleAtFixedRate(
+	if (! enableNotifications) {
+	    mapReaderScheduler.scheduleAtFixedRate(
 			mapReader, 3, 3, TimeUnit.SECONDS);
-	shortestPathReconcileScheduler.scheduleAtFixedRate(
+	    shortestPathReconcileScheduler.scheduleAtFixedRate(
 			shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
+	}
     }
 
     /**
@@ -531,7 +542,7 @@
 		flowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
 	}
 
-	if (FlowDatabaseOperation.addFlow(this, dbHandler, flowPath, flowId)) {
+	if (FlowDatabaseOperation.addFlow(this, dbHandlerApi, flowPath, flowId)) {
 	    datagridService.notificationSendFlowAdded(flowPath);
 	    return true;
 	}
@@ -546,8 +557,8 @@
      * @return the added Flow Entry object on success, otherwise null.
      */
     private IFlowEntry addFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
-	return FlowDatabaseOperation.addFlowEntry(this, dbHandler, flowObj,
-						  flowEntry);
+	return FlowDatabaseOperation.addFlowEntry(this, dbHandlerInner,
+						  flowObj, flowEntry);
     }
 
     /**
@@ -558,8 +569,8 @@
      * @return true on success, otherwise false.
      */
     private boolean deleteFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
-	return FlowDatabaseOperation.deleteFlowEntry(dbHandler, flowObj,
-						     flowEntry);
+	return FlowDatabaseOperation.deleteFlowEntry(dbHandlerInner,
+						     flowObj, flowEntry);
     }
 
     /**
@@ -569,7 +580,7 @@
      */
     @Override
     public boolean deleteAllFlows() {
-	if (FlowDatabaseOperation.deleteAllFlows(dbHandler)) {
+	if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
 	    datagridService.notificationSendAllFlowsRemoved();
 	    return true;
 	}
@@ -584,7 +595,7 @@
      */
     @Override
     public boolean deleteFlow(FlowId flowId) {
-	if (FlowDatabaseOperation.deleteFlow(dbHandler, flowId)) {
+	if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
 	    datagridService.notificationSendFlowRemoved(flowId);
 	    return true;
 	}
@@ -598,7 +609,7 @@
      */
     @Override
     public boolean clearAllFlows() {
-	if (FlowDatabaseOperation.clearAllFlows(dbHandler)) {
+	if (FlowDatabaseOperation.clearAllFlows(dbHandlerApi)) {
 	    datagridService.notificationSendAllFlowsRemoved();
 	    return true;
 	}
@@ -613,7 +624,7 @@
      */
     @Override
     public boolean clearFlow(FlowId flowId) {
-	if (FlowDatabaseOperation.clearFlow(dbHandler, flowId)) {
+	if (FlowDatabaseOperation.clearFlow(dbHandlerApi, flowId)) {
 	    datagridService.notificationSendFlowRemoved(flowId);
 	    return true;
 	}
@@ -628,7 +639,7 @@
      */
     @Override
     public FlowPath getFlow(FlowId flowId) {
-	return FlowDatabaseOperation.getFlow(dbHandler, flowId);
+	return FlowDatabaseOperation.getFlow(dbHandlerApi, flowId);
     }
 
     /**
@@ -638,7 +649,7 @@
      */
     @Override
     public ArrayList<FlowPath> getAllFlows() {
-	return FlowDatabaseOperation.getAllFlows(dbHandler);
+	return FlowDatabaseOperation.getAllFlows(dbHandlerApi);
     }
 
     /**
@@ -652,7 +663,7 @@
     @Override
     public ArrayList<FlowPath> getAllFlows(CallerId installerId,
 					   DataPathEndpoints dataPathEndpoints) {
-	return FlowDatabaseOperation.getAllFlows(dbHandler, installerId,
+	return FlowDatabaseOperation.getAllFlows(dbHandlerApi, installerId,
 						 dataPathEndpoints);
     }
 
@@ -664,7 +675,8 @@
      */
     @Override
     public ArrayList<FlowPath> getAllFlows(DataPathEndpoints dataPathEndpoints) {
-	return FlowDatabaseOperation.getAllFlows(dbHandler, dataPathEndpoints);
+	return FlowDatabaseOperation.getAllFlows(dbHandlerApi,
+						 dataPathEndpoints);
     }
 
     /**
@@ -677,7 +689,7 @@
     @Override
     public ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId,
 						   int maxFlows) {
-	return FlowDatabaseOperation.getAllFlowsSummary(dbHandler, flowId,
+	return FlowDatabaseOperation.getAllFlowsSummary(dbHandlerApi, flowId,
 							maxFlows);
     }
     
@@ -687,7 +699,7 @@
      * @return all Flows information, without the associated Flow Entries.
      */
     public ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries() {
-	return FlowDatabaseOperation.getAllFlowsWithoutFlowEntries(dbHandler);
+	return FlowDatabaseOperation.getAllFlowsWithoutFlowEntries(dbHandlerApi);
     }
 
     /**
@@ -714,6 +726,15 @@
     }
 
     /**
+     * Get the collection of my switches.
+     *
+     * @return the collection of my switches.
+     */
+    public Map<Long, IOFSwitch> getMySwitches() {
+	return floodlightProvider.getSwitches();
+    }
+
+    /**
      * Get the network topology.
      *
      * @return the network topology.
@@ -851,123 +872,179 @@
     }
 
     /**
-     * Push the modified Flow Entries of a collection of Flow Paths.
-     * Only the Flow Entries to switches controlled by this instance
+     * Push modified Flow Entries to switches.
+     *
+     * NOTE: Only the Flow Entries to switches controlled by this instance
      * are pushed.
      *
-     * NOTE: Currently, we write to both the Network MAP and the switches.
-     *
-     * @param modifiedFlowPaths the collection of Flow Paths with the modified
-     * Flow Entries.
+     * @param modifiedFlowEntries the collection of modified Flow Entries.
      */
-    public void pushModifiedFlowEntries(Collection<FlowPath> modifiedFlowPaths) {
+    public void pushModifiedFlowEntriesToSwitches(
+			Collection<FlowPathEntryPair> modifiedFlowEntries) {
 	// TODO: For now, the pushing of Flow Entries is disabled
-	if (true)
+	if (! enableNotifications)
 	    return;
 
-	Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
+	if (modifiedFlowEntries.isEmpty())
+	    return;
 
-	for (FlowPath flowPath : modifiedFlowPaths) {
+	Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowPath flowPath = flowPair.flowPath;
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+	    if (mySwitch == null)
+		continue;
+
+	    log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+
 	    //
-	    // Find the Flow Path in the Network MAP.
-	    // NOTE: The Flow Path might not be found if the Flow was just
-	    // removed by some other controller instance.
+	    // Install the Flow Entry into the switch
 	    //
-	    IFlowPath flowObj = dbHandler.searchFlowPath(flowPath.flowId());
+	    if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
+		String logMsg = "Cannot install Flow Entry " +
+		    flowEntry.flowEntryId() +
+		    " from Flow Path " + flowPath.flowId() +
+		    " on switch " + flowEntry.dpid();
+		log.error(logMsg);
+		continue;
+	    }
 
-	    boolean isFlowEntryDeleted = false;
-	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
-		log.debug("Updating Flow Entry: {}", flowEntry.toString());
+	    //
+	    // NOTE: Here we assume that the switch has been
+	    // successfully updated.
+	    //
+	    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+	}
+    }
 
-		if (flowEntry.flowEntrySwitchState() !=
-		    FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
-		    continue;		// No need to update the entry
-		}
-		if (flowEntry.flowEntryUserState() ==
+    /**
+     * Push modified Flow Entries to the datagrid.
+     *
+     * @param modifiedFlowEntries the collection of modified Flow Entries.
+     */
+    public void pushModifiedFlowEntriesToDatagrid(
+			Collection<FlowPathEntryPair> modifiedFlowEntries) {
+	// TODO: For now, the pushing of Flow Entries is disabled
+	if (! enableNotifications)
+	    return;
+
+	if (modifiedFlowEntries.isEmpty())
+	    return;
+
+	Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+
+	    //
+	    // TODO: For now Flow Entries are removed by all instances,
+	    // even if this Flow Entry is not for our switches.
+	    //
+	    // This is needed to handle the case a switch going down:
+	    // it has no Master controller instance, hence no
+	    // controller instance will cleanup its flow entries.
+	    // This is sub-optimal: we need to elect a controller
+	    // instance to handle the cleanup of such orphaned flow
+	    // entries.
+	    //
+	    if (mySwitch == null) {
+		if (flowEntry.flowEntryUserState() !=
 		    FlowEntryUserState.FE_USER_DELETE) {
-		    isFlowEntryDeleted = true;
-		}
-
-		//
-		// Install the Flow Entries into my switches
-		//
-		IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
-		if (mySwitch != null) {
-		    //
-		    // Assign the FlowEntry ID if needed
-		    //
-		    if (! flowEntry.isValidFlowEntryId()) {
-			long id = getNextFlowEntryId();
-			flowEntry.setFlowEntryId(new FlowEntryId(id));
-		    }
-
-		    //
-		    // Install the Flow Entry into the switch
-		    //
-		    if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
-			String logMsg = "Cannot install Flow Entry " +
-			    flowEntry.flowEntryId() +
-			    " from Flow Path " + flowPath.flowId() +
-			    " on switch " + flowEntry.dpid();
-			log.error(logMsg);
-			continue;
-		    }
-
-		    //
-		    // NOTE: Here we assume that the switch has been
-		    // successfully updated.
-		    //
-		    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
-		}
-
-		//
-		// TODO: For now Flow Entries are removed from the Datagrid
-		// and from the Network Map by all instances, even if this
-		// Flow Entry is not for our switches.
-		//
-		// This is needed to handle the case a switch going down:
-		// it has no Master controller instance, hence no
-		// controller instance will cleanup its flow entries.
-		// This is sub-optimal: we need to elect a controller
-		// instance to handle the cleanup of such orphaned flow
-		// entries.
-		//
-
-		//
-		// Write the Flow Entry to the Datagrid
-		//
-		switch (flowEntry.flowEntryUserState()) {
-		case FE_USER_ADD:
-		    if (mySwitch == null)
-			break;	// Install only flow entries for my switches
-		    datagridService.notificationSendFlowEntryAdded(flowEntry);
-		    break;
-		case FE_USER_MODIFY:
-		    if (mySwitch == null)
-			break;	// Install only flow entries for my switches
-		    datagridService.notificationSendFlowEntryUpdated(flowEntry);
-		    break;
-		case FE_USER_DELETE:
-		    datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
-		    break;
-		}
-
-		//
-		// Write the Flow Entry to the Network Map
-		//
-		if (mySwitch == null) {
-		    if (flowEntry.flowEntryUserState() !=
-			FlowEntryUserState.FE_USER_DELETE) {
-			continue;
-		    }
-		    if (! flowEntry.isValidFlowEntryId())
-			continue;
-		}
-		if (flowObj == null) {
-		    String logMsg = "Cannot find Network MAP entry for Flow Path " + flowPath.flowId();
 		    continue;
 		}
+		if (! flowEntry.isValidFlowEntryId())
+		    continue;
+	    }
+
+	    log.debug("Pushing Flow Entry To Datagrid: {}", flowEntry.toString());
+	    //
+	    // Write the Flow Entry to the Datagrid
+	    //
+	    switch (flowEntry.flowEntryUserState()) {
+	    case FE_USER_ADD:
+		if (mySwitch == null)
+		    break;	// Install only flow entries for my switches
+		datagridService.notificationSendFlowEntryAdded(flowEntry);
+		break;
+	    case FE_USER_MODIFY:
+		if (mySwitch == null)
+		    break;	// Install only flow entries for my switches
+		datagridService.notificationSendFlowEntryUpdated(flowEntry);
+		break;
+	    case FE_USER_DELETE:
+		datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+		break;
+	    }
+	}
+    }
+
+    /**
+     * Push Flow Entries to the Network MAP.
+     *
+     * NOTE: The Flow Entries are pushed only on the instance responsible
+     * for the first switch. This is to avoid database errors when multiple
+     * instances are writing Flow Entries for the same Flow Path.
+     *
+     * @param modifiedFlowEntries the collection of Flow Entries to push.
+     */
+    public void pushModifiedFlowEntriesToDatabase(
+		Collection<FlowPathEntryPair> modifiedFlowEntries) {
+	// TODO: For now, the pushing of Flow Entries is disabled
+	if (! enableNotifications)
+	    return;
+
+	if (modifiedFlowEntries.isEmpty())
+	    return;
+
+	Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+	for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+	    FlowPath flowPath = flowPair.flowPath;
+	    FlowEntry flowEntry = flowPair.flowEntry;
+
+	    if (! flowEntry.isValidFlowEntryId())
+		continue;
+
+	    //
+	    // Push the changes only on the instance responsible for the
+	    // first switch.
+	    //
+	    Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+	    IOFSwitch mySrcSwitch = mySwitches.get(srcDpid.value());
+	    if (mySrcSwitch == null)
+		continue;
+
+	    log.debug("Pushing Flow Entry To Database: {}", flowEntry.toString());
+	    //
+	    // Write the Flow Entry to the Network Map
+	    //
+	    // NOTE: We try a number of times, in case somehow some other
+	    // instances are writing at the same time.
+	    // Apparently, if other instances are writing at the same time
+	    // this will trigger an error.
+	    //
+	    for (int i = 0; i < 6; i++) {
 		try {
+		    //
+		    // Find the Flow Path in the Network MAP.
+		    //
+		    // NOTE: The Flow Path might not be found if the Flow was
+		    // just removed by some other controller instance.
+		    //
+		    IFlowPath flowObj =
+			dbHandlerInner.searchFlowPath(flowEntry.flowId());
+		    if (flowObj == null) {
+			String logMsg = "Cannot find Network MAP entry for Flow Path " + flowEntry.flowId();
+			log.error(logMsg);
+			break;
+		    }
+
+		    // Write the Flow Entry
 		    switch (flowEntry.flowEntryUserState()) {
 		    case FE_USER_ADD:
 			// FALLTHROUGH
@@ -975,7 +1052,7 @@
 			if (addFlowEntry(flowObj, flowEntry) == null) {
 			    String logMsg = "Cannot write to Network MAP Flow Entry " +
 				flowEntry.flowEntryId() +
-				" from Flow Path " + flowPath.flowId() +
+				" from Flow Path " + flowEntry.flowId() +
 				" on switch " + flowEntry.dpid();
 			    log.error(logMsg);
 			}
@@ -984,41 +1061,28 @@
 			if (deleteFlowEntry(flowObj, flowEntry) == false) {
 			    String logMsg = "Cannot remove from Network MAP Flow Entry " +
 				flowEntry.flowEntryId() +
-				" from Flow Path " + flowPath.flowId() +
+				" from Flow Path " + flowEntry.flowId() +
 				" on switch " + flowEntry.dpid();
 			    log.error(logMsg);
 			}
 			break;
 		    }
-		} catch (Exception e) {
-		    String logMsg = "Exception writing Flow Entry to Network MAP";
-		    log.debug(logMsg);
-		    dbHandler.rollback();
-		    continue;
-		}
-	    }
 
-	    //
-	    // Remove Flow Entries that were deleted
-	    //
-	    // NOTE: We create a new ArrayList, and add only the Flow Entries
-	    // that are NOT FE_USER_DELETE.
-	    // This is sub-optimal: if it adds notable processing cost,
-	    // the Flow Entries container should be changed to LinkedList
-	    // or some other container that has O(1) cost of removing an entry.
-	    //
-	    if (isFlowEntryDeleted) {
-		ArrayList<FlowEntry> newFlowEntries = new ArrayList<FlowEntry>();
-		for (FlowEntry flowEntry : flowPath.flowEntries()) {
-		    if (flowEntry.flowEntryUserState() !=
-			FlowEntryUserState.FE_USER_DELETE) {
-			newFlowEntries.add(flowEntry);
+		    // Commit to the database
+		    dbHandlerInner.commit();
+		    break;	// Success
+
+		} catch (Exception e) {
+		    log.debug("Exception writing Flow Entry to Network MAP: ", e);
+		    dbHandlerInner.rollback();
+		    // Wait a bit (random value [1ms, 20ms] and try again
+		    int delay = 1 + randomGenerator.nextInt() % 20;
+		    try {
+			Thread.sleep(delay);
+		    } catch (Exception e0) {
 		    }
 		}
-		flowPath.dataPath().setFlowEntries(newFlowEntries);
 	    }
 	}
-
-	dbHandler.commit();
     }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index 7d7d739..dbf9ada 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -121,7 +121,7 @@
 	Link reverseLink = reverseLinksMap.get(portId);
 	if (reverseLink != null) {
 	    // NOTE: reverseLink.myPort is the neighbor's outgoing port
-	    reverseLink.neighbor.removeLink(reverseLink.myPort);
+	    reverseLink.me.removeLink(reverseLink.myPort);
 	    removeReverseLink(reverseLink);
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java b/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java
index 7c6597d..044cc6d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java
@@ -101,6 +101,41 @@
     }
 
     /**
+     * Remove Flow Entries that were deleted.
+     */
+    public void removeDeletedFlowEntries() {
+	//
+	// NOTE: We create a new ArrayList, and add only the Flow Entries
+	// that are NOT FE_USER_DELETE.
+	// This is sub-optimal: if it adds notable processing cost,
+	// the Flow Entries container should be changed to LinkedList
+	// or some other container that has O(1) cost of removing an entry.
+	//
+
+	// Test first whether any Flow Entry was deleted
+	boolean foundDeletedFlowEntry = false;
+	for (FlowEntry flowEntry : this.flowEntries) {
+	    if (flowEntry.flowEntryUserState() ==
+		FlowEntryUserState.FE_USER_DELETE) {
+		foundDeletedFlowEntry = true;
+		break;
+	    }
+	}
+	if (! foundDeletedFlowEntry)
+	    return;			// Nothing to do
+
+	// Create a new collection and exclude the deleted flow entries
+	ArrayList<FlowEntry> newFlowEntries = new ArrayList<FlowEntry>();
+	for (FlowEntry flowEntry : this.flowEntries()) {
+	    if (flowEntry.flowEntryUserState() !=
+		FlowEntryUserState.FE_USER_DELETE) {
+		newFlowEntries.add(flowEntry);
+	    }
+	}
+	setFlowEntries(newFlowEntries);
+    }
+
+    /**
      * Get a string with the summary of the shortest-path data path
      * computation.
      *