Merge branch 'ONOS-ONRC2014-Measurements' of github.com:OPENNETWORKINGLAB/ONOS into RAMCloud-merge

Conflicts:
	src/main/java/net/onrc/onos/graph/GraphDBOperation.java
	src/main/java/net/onrc/onos/graph/IDBOperation.java
	src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
	src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
	src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
	src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
	src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 6483121..c195f82 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -21,10 +21,12 @@
 import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
 import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
 
 import org.slf4j.Logger;
@@ -73,12 +75,24 @@
     private MapFlowEntryListener mapFlowEntryListener = null;
     private String mapFlowEntryListenerId = null;
 
+    // State related to the Flow ID map
+    protected static final String mapFlowIdName = "mapFlowId";
+    private IMap<Long, byte[]> mapFlowId = null;
+    private MapFlowIdListener mapFlowIdListener = null;
+    private String mapFlowIdListenerId = null;
+
+    // State related to the Flow Entry ID map
+    protected static final String mapFlowEntryIdName = "mapFlowEntryId";
+    private IMap<Long, byte[]> mapFlowEntryId = null;
+    private MapFlowEntryIdListener mapFlowEntryIdListener = null;
+    private String mapFlowEntryIdListenerId = null;
+
     // State related to the Network Topology map
     protected static final String mapTopologyName = "mapTopology";
     private IMap<String, byte[]> mapTopology = null;
     private MapTopologyListener mapTopologyListener = null;
     private String mapTopologyListenerId = null;
-    
+
     // State related to the ARP map
     protected static final String arpMapName = "arpMap";
     private IMap<ArpMessage, byte[]> arpMap = null;
@@ -98,8 +112,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryAdded(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -116,8 +131,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryRemoved(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -134,8 +150,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryUpdated(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -152,6 +169,7 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryEvicted(EntryEvent<Long, byte[]> event) {
 	    // NOTE: We don't use eviction for this map
 	}
@@ -170,8 +188,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryAdded(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -188,8 +207,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryRemoved(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -206,8 +226,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryUpdated(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -224,6 +245,169 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
+	public void entryEvicted(EntryEvent<Long, byte[]> event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+
+    /**
+     * Class for receiving notifications for FlowId state.
+     *
+     * The datagrid map is:
+     *  - Key : FlowId (Long)
+     *  - Value : Serialized Switch Dpid (byte[])
+     */
+    class MapFlowIdListener implements EntryListener<Long, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryAdded(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
+	    byte[] valueBytes = event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryRemoved(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
+	    byte[] valueBytes = event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryUpdated(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowId flowId = new FlowId(keyLong);
+
+	    byte[] valueBytes = event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryEvicted(EntryEvent<Long, byte[]> event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+
+    /**
+     * Class for receiving notifications for FlowEntryId state.
+     *
+     * The datagrid map is:
+     *  - Key : FlowEntryId (Long)
+     *  - Value : Serialized Switch Dpid (byte[])
+     */
+    class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryAdded(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+	    byte[] valueBytes = event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryRemoved(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+	    byte[] valueBytes = event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryUpdated(EntryEvent<Long, byte[]> event) {
+	    Long keyLong = event.getKey();
+	    FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+	    byte[] valueBytes = event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
 	public void entryEvicted(EntryEvent<Long, byte[]> event) {
 	    // NOTE: We don't use eviction for this map
 	}
@@ -242,8 +426,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryAdded(EntryEvent<String, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -261,8 +446,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryRemoved(EntryEvent<String, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -280,8 +466,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryUpdated(EntryEvent<String, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -299,11 +486,12 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryEvicted(EntryEvent<String, byte[]> event) {
 	    // NOTE: We don't use eviction for this map
 	}
     }
-    
+
     /**
      * Class for receiving notifications for ARP requests.
      *
@@ -317,11 +505,12 @@
 		 *
 		 * @param event the notification event for the entry.
 		 */
+		@Override
 		public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
 		    for (IArpEventHandler arpEventHandler : arpEventHandlers) {
 		    	arpEventHandler.arpRequestNotification(event.getKey());
 		    }
-		    
+
 		    //
 		    // Decode the value and deliver the notification
 		    //
@@ -334,30 +523,33 @@
 		    flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
 		    */
 		}
-	
+
 		/**
 		 * Receive a notification that an entry is removed.
 		 *
 		 * @param event the notification event for the entry.
 		 */
+		@Override
 		public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
 			// Not used
 		}
-	
+
 		/**
 		 * Receive a notification that an entry is updated.
 		 *
 		 * @param event the notification event for the entry.
 		 */
+		@Override
 		public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
 			// Not used
 		}
-	
+
 		/**
 		 * Receive a notification that an entry is evicted.
 		 *
 		 * @param event the notification event for the entry.
 		 */
+		@Override
 		public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
 		    // Not used
 		}
@@ -374,7 +566,7 @@
 	System.setProperty("hazelcast.socket.send.buffer.size", "32");
 	*/
 	// System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
-	
+
 	// Init from configuration file
 	try {
 	    hazelcastConfig = new FileSystemXmlConfig(configFilename);
@@ -395,7 +587,8 @@
     /**
      * Shutdown the Hazelcast Datagrid operation.
      */
-    public void finalize() {
+    @Override
+    protected void finalize() {
 	close();
     }
 
@@ -413,7 +606,7 @@
      */
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
             new ArrayList<Class<? extends IFloodlightService>>();
         l.add(IDatagridService.class);
         return l;
@@ -425,10 +618,10 @@
      * @return the collection of implemented services.
      */
     @Override
-    public Map<Class<? extends IFloodlightService>, IFloodlightService> 
+    public Map<Class<? extends IFloodlightService>, IFloodlightService>
 			       getServiceImpls() {
         Map<Class<? extends IFloodlightService>,
-	    IFloodlightService> m = 
+	    IFloodlightService> m =
             new HashMap<Class<? extends IFloodlightService>,
                 IFloodlightService>();
         m.put(IDatagridService.class, this);
@@ -441,7 +634,7 @@
      * @return the collection of modules this module depends on.
      */
     @Override
-    public Collection<Class<? extends IFloodlightService>> 
+    public Collection<Class<? extends IFloodlightService>>
                                                     getModuleDependencies() {
 	Collection<Class<? extends IFloodlightService>> l =
 	    new ArrayList<Class<? extends IFloodlightService>>();
@@ -477,7 +670,7 @@
 	hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
 
 	restApi.addRestletRoutable(new DatagridWebRoutable());
-	
+
 	arpMap = hazelcastInstance.getMap(arpMapName);
 	arpMap.addEntryListener(new ArpMapListener(), true);
     }
@@ -504,6 +697,16 @@
 	mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
 	mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
 
+	// Initialize the FlowId-related map state
+	mapFlowIdListener = new MapFlowIdListener();
+	mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+	mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
+	// Initialize the FlowEntryId-related map state
+	mapFlowEntryIdListener = new MapFlowEntryIdListener();
+	mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
+	mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
+
 	// Initialize the Topology-related map state
 	mapTopologyListener = new MapTopologyListener();
 	mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -531,6 +734,16 @@
 	mapFlowEntry = null;
 	mapFlowEntryListener = null;
 
+	// Clear the FlowId-related map state
+	mapFlowId.removeEntryListener(mapFlowIdListenerId);
+	mapFlowId = null;
+	mapFlowIdListener = null;
+
+	// Clear the FlowEntryId-related map state
+	mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
+	mapFlowEntryId = null;
+	mapFlowEntryIdListener = null;
+
 	// Clear the Topology-related map state
 	mapTopology.removeEntryListener(mapTopologyListenerId);
 	mapTopology = null;
@@ -538,19 +751,19 @@
 
 	this.flowEventHandlerService = null;
     }
-    
+
     @Override
     public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
     	if (arpEventHandler != null) {
     		arpEventHandlers.add(arpEventHandler);
     	}
     }
-    
+
     @Override
     public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
     	arpEventHandlers.remove(arpEventHandler);
     }
-    
+
     /**
      * Get all Flows that are currently in the datagrid.
      *
@@ -788,6 +1001,216 @@
     }
 
     /**
+     * Get all Flow IDs that are currently in the datagrid.
+     *
+     * @return all Flow IDs that are currently in the datagrid.
+     */
+    @Override
+	public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
+	Collection<Pair<FlowId, Dpid>> allFlowIds =
+	    new LinkedList<Pair<FlowId, Dpid>>();
+
+	//
+	// Get all current entries
+	//
+	Kryo kryo = kryoFactory.newKryo();
+	for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
+	    Long key = entry.getKey();
+	    byte[] valueBytes = entry.getValue();
+
+	    FlowId flowId = new FlowId(key);
+
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+
+	    Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
+	    allFlowIds.add(pair);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allFlowIds;
+    }
+
+    /**
+     * Get all Flow Entry IDs that are currently in the datagrid.
+     *
+     * @return all Flow Entry IDs that ae currently in the datagrid.
+     */
+    @Override
+    public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
+	Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
+	    new LinkedList<Pair<FlowEntryId, Dpid>>();
+
+	//
+	// Get all current entries
+	//
+	Kryo kryo = kryoFactory.newKryo();
+	for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
+	    Long key = entry.getKey();
+	    byte[] valueBytes = entry.getValue();
+
+	    FlowEntryId flowEntryId = new FlowEntryId(key);
+
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    Dpid dpid = kryo.readObject(input, Dpid.class);
+
+	    Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
+	    allFlowEntryIds.add(pair);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allFlowEntryIds;
+    }
+
+    /**
+     * Send a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid.
+     */
+    @Override
+    public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, dpid);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized Switch Dpid (byte[])
+	//
+	mapFlowId.putAsync(flowId.value(), valueBytes);
+    }
+
+    /**
+     * Send a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    @Override
+    public void notificationSendFlowIdRemoved(FlowId flowId) {
+	//
+	// Remove the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized Switch Dpid (byte[])
+	//
+	mapFlowId.removeAsync(flowId.value());
+    }
+
+    /**
+     * Send a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid.
+     */
+    @Override
+    public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	notificationSendFlowIdAdded(flowId, dpid);
+    }
+
+    /**
+     * Send a notification that all Flow IDs are removed.
+     */
+    @Override
+    public void notificationSendAllFlowIdsRemoved() {
+	//
+	// Remove all entries
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// mapFlowId.clear();
+	Set<Long> keySet = mapFlowId.keySet();
+	for (Long key : keySet) {
+	    mapFlowId.removeAsync(key);
+	}
+    }
+
+    /**
+     * Send a notification that a FlowEntryId is added.
+     *
+     * @param flowEntryId the FlowEntryId that is added.
+     * @param dpid the Switch Dpid.
+     */
+    @Override
+    public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
+						 Dpid dpid) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, dpid);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry:
+	//  - Key : FlowEntryId (Long)
+	//  - Value : Serialized Switch Dpid (byte[])
+	//
+	mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
+    }
+
+    /**
+     * Send a notification that a FlowEntryId is removed.
+     *
+     * @param flowEntryId the FlowEntryId that is removed.
+     */
+    @Override
+    public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
+	//
+	// Remove the entry:
+	//  - Key : FlowEntryId (Long)
+	//  - Value : Serialized Switch Dpid (byte[])
+	//
+	mapFlowEntryId.removeAsync(flowEntryId.value());
+    }
+
+    /**
+     * Send a notification that a FlowEntryId is updated.
+     *
+     * @param flowEntryId the FlowEntryId that is updated.
+     * @param dpid the Switch Dpid.
+     */
+    @Override
+    public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+						   Dpid dpid) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	notificationSendFlowEntryIdAdded(flowEntryId, dpid);
+    }
+
+    /**
+     * Send a notification that all Flow Entry IDs are removed.
+     */
+    @Override
+    public void notificationSendAllFlowEntryIdsRemoved() {
+	//
+	// Remove all entries
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// mapFlowEntryId.clear();
+	Set<Long> keySet = mapFlowEntryId.keySet();
+	for (Long key : keySet) {
+	    mapFlowEntryId.removeAsync(key);
+	}
+    }
+
+    /**
      * Get all Topology Elements that are currently in the datagrid.
      *
      * @return all Topology Elements that are currently in the datagrid.
@@ -883,7 +1306,7 @@
 	    mapTopology.removeAsync(key);
 	}
     }
-    
+
     @Override
     public void sendArpRequest(ArpMessage arpMessage) {
     	//log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..a855798 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -7,10 +7,12 @@
 import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
 
 /**
  * Interface for providing Datagrid Service to other modules.
@@ -134,6 +136,77 @@
     void notificationSendAllFlowEntriesRemoved();
 
     /**
+     * Get all Flow IDs that are currently in the datagrid.
+     *
+     * @return all Flow IDs that ae currently in the datagrid.
+     */
+    Collection<Pair<FlowId, Dpid>> getAllFlowIds();
+
+    /**
+     * Send a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid.
+     */
+    void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid);
+
+    /**
+     * Send a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    void notificationSendFlowIdRemoved(FlowId flowId);
+
+    /**
+     * Send a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid.
+     */
+    void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid);
+
+    /**
+     * Send a notification that all Flow IDs are removed.
+     */
+    void notificationSendAllFlowIdsRemoved();
+
+    /**
+     * Get all Flow Entry IDs that are currently in the datagrid.
+     *
+     * @return all Flow Entry IDs that ae currently in the datagrid.
+     */
+    Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds();
+
+    /**
+     * Send a notification that a FlowEntryId is added.
+     *
+     * @param flowEntryId the FlowEntryId that is added.
+     * @param dpid the Switch Dpid.
+     */
+    void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+    /**
+     * Send a notification that a FlowEntryId is removed.
+     *
+     * @param flowEntryId the FlowEntryId that is removed.
+     */
+    void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId);
+
+    /**
+     * Send a notification that a FlowEntryId is updated.
+     *
+     * @param flowEntryId the FlowEntryId that is updated.
+     * @param dpid the Switch Dpid.
+     */
+    void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+					    Dpid dpid);
+
+    /**
+     * Send a notification that all Flow Entry IDs are removed.
+     */
+    void notificationSendAllFlowEntryIdsRemoved();
+
+    /**
      * Get all Topology Elements that are currently in the datagrid.
      *
      * @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
index 80a6338..27de2c0 100644
--- a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
+++ b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
@@ -1,5 +1,7 @@
 package net.onrc.onos.graph;
 
+import java.util.Map;
+
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 
 import org.slf4j.Logger;
@@ -35,6 +37,8 @@
 
 	}
 
+	@Override
+	//public void edgeRemoved(Edge e, Map<String, Object> arg1) {
 	public void edgeRemoved(Edge e) {
 		// TODO Auto-generated method stub
 		// Fire NetMapEvents (LinkRemoved, FlowEntryRemoved, HostRemoved, PortRemoved)
@@ -72,6 +76,8 @@
 
 	}
 
+	@Override
+	//public void vertexRemoved(Vertex vertex, Map<String, Object> arg1) {
 	public void vertexRemoved(Vertex vertex) {
 		// TODO Auto-generated method stub
 		// Generate NetMapEvents 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
index f469911..ddc7527 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
@@ -60,7 +60,7 @@
 	 * It will close the DB connection. This is for Java garbage collection.
 	 */
 	@Override
-	public void finalize() {
+	protected void finalize() {
 		close();
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
index b34a2fc..8882a55 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
@@ -27,6 +27,7 @@
 	protected DBOperation dbop;
 	private static PerfMon pm = PerfMon.getInstance();
 
+
 	/**
 	 * Initialize the object. Open LinkStorage using given configuration file.
 	 * @param conf Path (absolute path for now) to configuration file.
@@ -432,7 +433,7 @@
 	 * Finalize the object.
 	 */
 	@Override
-	public void finalize() {
+	protected void finalize() {
 		close();
 	}
 
@@ -493,6 +494,8 @@
 				log.error("LinkStorageImpl:addLinkImpl failed link exists {} {} src {} dst {}",
 						new Object[]{dbop, lt, vportSrc, vportDst});
 			}
+		} else {
+			log.error("Ports not found : {}", lt);
 		}
 
 		return success;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
index 7c474bd..0dcf4297 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
@@ -48,7 +48,7 @@
 	 * It will close the DB connection.
 	 */
 	@Override
-	public void finalize() {
+	protected void finalize() {
 		close();
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
index cf73c9c..24392df 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
@@ -23,18 +23,18 @@
 	protected DBOperation dbop;
 	protected final static Logger log = LoggerFactory.getLogger(TopoLinkServiceImpl.class);
 
-	public void finalize() {
+	@Override
+	protected void finalize() {
 		close();
 	}
-	
+
 	@Override
 	public void close() {
 		dbop.close();
 	}
- 
+
 	@Override
 	public List<Link> getActiveLinks() {
-		// TODO Auto-generated method stub
 		dbop = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
 		//dbop = GraphDBManager.getDBOperation("", "");
 		//dbop.commit(); //Commit to ensure we see latest data
@@ -47,12 +47,12 @@
 			pipe.start(sw.asVertex());
 			pipe.enablePath(true);
 			pipe.out("on").out("link").in("on").path().step(extractor);
-					
+
 			while (pipe.hasNext() ) {
 				Link l = pipe.next();
 				links.add(l);
 			}
-						
+
 		}
 		dbop.commit();
 		return links;
@@ -68,7 +68,7 @@
 		pipe.start(sw.asVertex());
 		pipe.enablePath(true);
 		pipe.out("on").out("link").in("on").path().step(extractor);
-			
+
 		while (pipe.hasNext() ) {
 			Link l = pipe.next();
 			links.add(l);
@@ -84,7 +84,7 @@
 			long d_dpid = 0;
 			short s_port = 0;
 			short d_port = 0;
-			
+
 			List<?> V = pipe.next();
 			Vertex src_sw = (Vertex)V.get(0);
 			Vertex dest_sw = (Vertex)V.get(3);
@@ -94,9 +94,9 @@
 			d_dpid = HexString.toLong((String) dest_sw.getProperty("dpid"));
 			s_port = (Short) src_port.getProperty("number");
 			d_port = (Short) dest_port.getProperty("number");
-			
+
 			Link l = new Link(s_dpid,s_port,d_dpid,d_port);
-			
+
 			return l;
 		}
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
index 3f7090b..f417d96 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
@@ -10,7 +10,7 @@
 import org.slf4j.LoggerFactory;
 
 public class TopoSwitchServiceImpl implements ITopoSwitchService {
-	
+
 	private DBOperation op;
 	protected final static Logger log = LoggerFactory.getLogger(TopoSwitchServiceImpl.class);
 
@@ -22,16 +22,17 @@
 	public TopoSwitchServiceImpl() {
 		this("","");
 	}
-	
-	public void finalize() {
+
+	@Override
+	protected void finalize() {
 		close();
 	}
-	
+
 	@Override
 	public void close() {
 		op.close();
 	}
-	
+
 	@Override
 	public Iterable<ISwitchObject> getActiveSwitches() {
 		// TODO Auto-generated method stub
@@ -67,5 +68,5 @@
 	public IPortObject getPortOnSwitch(String dpid, short port_num) {
 		// TODO Auto-generated method stub
 		return null;
-	}	
+	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index af7bca8..1f0c163 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -3,9 +3,11 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Map;
 
+import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.graph.DBOperation;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IBaseObject;
@@ -139,7 +141,7 @@
 	//
 	flowProp.setFlowId(flowPath.flowId().toString());
 	if ( measureONOSFlowTimeProp ) {
-	    numPropsSet += 2;
+	    numPropsSet += 1;
 	}
 
 	//
@@ -613,11 +615,7 @@
 	    }
 	}
 
-	// TODO: Hacks with hard-coded state names!
-	if (found)
-		flowProp.setUserState("FE_USER_MODIFY");
-	else
-		flowProp.setUserState("FE_USER_ADD");
+	flowProp.setUserState(flowEntry.flowEntryUserState().toString());
 	flowProp.setSwitchState(flowEntry.flowEntrySwitchState().toString());
 	if (measureONOSFlowEntryTimeProp) {
 	    numProperties += 2;
@@ -804,6 +802,77 @@
     }
 
     /**
+     * Get a previously added flow entry.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowEntryId the Flow Entry ID of the flow entry to get.
+     * @return the Flow Entry if found, otherwise null.
+     */
+    static FlowEntry getFlowEntry(DBOperation dbHandler,
+				  FlowEntryId flowEntryId) {
+	IFlowEntry flowEntryObj = null;
+	try {
+	    flowEntryObj = dbHandler.searchFlowEntry(flowEntryId);
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getFlowEntry FlowEntryId:{} failed", flowEntryId);
+	    return null;
+	}
+	if (flowEntryObj == null) {
+	    dbHandler.commit();
+	    return null;		// Flow not found
+	}
+
+	//
+	// Extract the Flow Entry state
+	//
+	FlowEntry flowEntry = extractFlowEntry(flowEntryObj);
+	dbHandler.commit();
+
+	return flowEntry;
+    }
+
+    /**
+     * Get the source switch DPID of a previously added flow.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowId the Flow ID of the flow to get.
+     * @return the source switch DPID if found, otherwise null.
+     */
+    static Dpid getFlowSourceDpid(DBOperation dbHandler, FlowId flowId) {
+	IFlowPath flowObj = null;
+	try {
+	    flowObj = dbHandler.searchFlowPath(flowId);
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getFlowSourceDpid FlowId:{} failed", flowId);
+	    return null;
+	}
+	if (flowObj == null) {
+	    dbHandler.commit();
+	    return null;		// Flow not found
+	}
+
+	//
+	// Extract the Flow Source DPID
+	//
+	String srcSwitchStr = flowObj.getSrcSwitch();
+	if (srcSwitchStr == null) {
+	    // TODO: A work-around, becauuse of some bogus database objects
+	    dbHandler.commit();
+	    return null;
+	}
+
+	Dpid dpid = new Dpid(srcSwitchStr);
+
+	dbHandler.commit();
+
+	return dpid;
+    }
+
+    /**
      * Get all installed flows by all installers.
      *
      * @param dbHandler the Graph Database handler to use.
@@ -841,12 +910,88 @@
     }
 
     /**
+     * Get all installed flows whose Source Switch is controlled by this
+     * instance.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param mySwitches the collection of the switches controlled by this
+     * instance.
+     * @return the Flow Paths if found, otherwise null.
+     */
+    static ArrayList<FlowPath> getAllMyFlows(DBOperation dbHandler,
+					     Map<Long, IOFSwitch> mySwitches) {
+	Iterable<IFlowPath> flowPathsObj = null;
+	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+	try {
+	    flowPathsObj = dbHandler.getAllFlowPaths();
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getAllMyFlowPaths failed");
+	    return flowPaths;
+	}
+	if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+	    dbHandler.commit();
+	    return flowPaths;	// No Flows found
+	}
+
+	for (IFlowPath flowObj : flowPathsObj) {
+	    //
+	    // Extract the Source Switch DPID and ignore if the switch
+	    // is not controlled by this instance.
+	    //
+	    String srcSwitchStr = flowObj.getSrcSwitch();
+	    if (srcSwitchStr == null) {
+		// TODO: A work-around, becauuse of some bogus database objects
+		continue;
+	    }
+	    Dpid dpid = new Dpid(srcSwitchStr);
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
+
+	    //
+	    // Extract the Flow state
+	    //
+	    FlowPath flowPath = extractFlowPath(flowObj);
+	    if (flowPath != null)
+		flowPaths.add(flowPath);
+	}
+
+	dbHandler.commit();
+
+	return flowPaths;
+    }
+
+    /**
+     * Get a subset of installed flows.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowIds the collection of Flow IDs to get.
+     * @return the Flow Paths if found, otherwise null.
+     */
+    static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
+					Collection<FlowId> flowIds) {
+	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+	// TODO: This implementation should use threads
+	for (FlowId flowId : flowIds) {
+	    FlowPath flowPath = getFlow(dbHandler, flowId);
+	    if (flowPath != null)
+		flowPaths.add(flowPath);
+	}
+	// dbHandler.commit();
+
+	return flowPaths;
+    }
+
+    /**
      * Extract Flow Path State from a Titan Database Object @ref IFlowPath.
      *
      * @param flowObj the object to extract the Flow Path State from.
      * @return the extracted Flow Path State.
      */
-    private static FlowPath extractFlowPath(IFlowPath flowObj) {
+    static FlowPath extractFlowPath(IFlowPath flowObj) {
 	//
 	// Extract the Flow state
 	//
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 001fb3c..cf9b67d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -15,11 +15,15 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.graph.GraphDBManager;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
 import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.EventEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction;
@@ -31,6 +35,8 @@
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Pair;
+import net.onrc.onos.ofcontroller.util.Port;
 import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
 
 import com.esotericsoftware.kryo2.Kryo;
@@ -46,17 +52,16 @@
  * - Detect FlowPaths impacted by Topology change.
  * - Recompute impacted FlowPath using cached Topology.
  */
-class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+class FlowEventHandler extends Thread implements IFlowEventHandlerService,
+						 IOFSwitchListener {
+
+    private boolean enableOnrc2014MeasurementsFlows = true;
+    private boolean enableOnrc2014MeasurementsTopology = true;
+
     /** The logger. */
     private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
 
-    // Flag to refresh Topology object periodically
-    private final static boolean refreshTopology = false;
-    // Refresh delay(ms)
-    private final static long refreshTopologyDelay = 5000;
-    // Refresh interval(ms)
-    private final static long refreshTopologyInterval = 2000;
-    private Timer refreshTopologyTimer;
+    private DBOperation dbHandler;
 
     private FlowManager flowManager;		// The Flow Manager to use
     private IDatagridService datagridService;	// The Datagrid Service to use
@@ -74,6 +79,12 @@
 	new LinkedList<EventEntry<FlowPath>>();
     private List<EventEntry<FlowEntry>> flowEntryEvents =
 	new LinkedList<EventEntry<FlowEntry>>();
+    private List<EventEntry<Pair<FlowId, Dpid>>> flowIdEvents =
+	new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
+    private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
+	new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
+    private List<EventEntry<Dpid>> switchDpidEvents =
+	new LinkedList<EventEntry<Dpid>>();
 
     // All internally computed Flow Paths
     private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -119,6 +130,8 @@
      * Startup processing.
      */
     private void startup() {
+	this.dbHandler = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
+
 	//
 	// Obtain the initial Topology state
 	//
@@ -148,32 +161,33 @@
 	    flowEntryEvents.add(eventEntry);
 	}
 
+	//
+	// Obtain the initial FlowId state
+	//
+	Collection<Pair<FlowId, Dpid>> flowIds =
+	    datagridService.getAllFlowIds();
+	for (Pair<FlowId, Dpid> pair : flowIds) {
+	    EventEntry<Pair<FlowId, Dpid>> eventEntry =
+		new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+	    flowIdEvents.add(eventEntry);
+	}
+
+	//
+	// Obtain the initial FlowEntryId state
+	//
+	Collection<Pair<FlowEntryId, Dpid>> flowEntryIds =
+	    datagridService.getAllFlowEntryIds();
+	for (Pair<FlowEntryId, Dpid> pair : flowEntryIds) {
+	    EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+		new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+	    flowEntryIdEvents.add(eventEntry);
+	}
+
 	// Process the initial events (if any)
 	synchronized (allFlowPaths) {
 	    processEvents();
 	}
 
-	if (refreshTopology) {
-		refreshTopologyTimer = new Timer();
-		refreshTopologyTimer.schedule(new TimerTask() {
-			@Override
-			public void run() {
-				PerfMon pm = PerfMon.getInstance();
-				log.debug("[BEFORE] {}", topology);
-				long begin, end;
-				synchronized(topology) {
-					begin = System.nanoTime();
-					pm.read_whole_topology_start();
-					topology.readFromDatabase(flowManager.dbHandlerInner);
-					pm.read_whole_topology_end();
-					end = System.nanoTime();
-				}
-				// FIXME level raised for measurement. Was debug
-				log.error("[AFTER] {}", topology);
-				log.error("refresh takes : {}[us]", (end - begin) / 1000.0);
-			}
-		}, refreshTopologyDelay, refreshTopologyInterval);
-	}
     }
 
     /**
@@ -199,12 +213,15 @@
 		//  - EventEntry<TopologyElement>
 		//  - EventEntry<FlowPath>
 		//  - EventEntry<FlowEntry>
+		//  - EventEntry<Pair<FlowId, Dpid>>
+		//  - EventEntry<Pair<FlowEntryId, Dpid>>
 		//
 		for (EventEntry<?> event : collection) {
 		    // Topology event
 		    if (event.eventData() instanceof TopologyElement) {
 			EventEntry<TopologyElement> topologyEventEntry =
 			    (EventEntry<TopologyElement>)event;
+			
 			topologyEvents.add(topologyEventEntry);
 			continue;
 		    }
@@ -224,6 +241,34 @@
 			flowEntryEvents.add(flowEntryEventEntry);
 			continue;
 		    }
+
+		    // FlowId event
+		    if (event.eventData() instanceof Pair) {
+			EventEntry<Pair<FlowId, Dpid>> flowIdEventEntry =
+			    (EventEntry<Pair<FlowId, Dpid>>)event;
+			flowIdEvents.add(flowIdEventEntry);
+			continue;
+		    }
+
+		    // Switch Dpid event
+		    if (event.eventData() instanceof Dpid) {
+			EventEntry<Dpid> switchDpidEventEntry =
+			    (EventEntry<Dpid>)event;
+			switchDpidEvents.add(switchDpidEventEntry);
+			continue;
+		    }
+
+		    // FlowEntryId event
+		    // TODO: Fix the code below if we need again to handle
+		    // the FlowEntryId events
+		    /*
+		    if (event.eventData() instanceof Pair) {
+			EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
+			    (EventEntry<Pair<FlowEntryId, Dpid>>)event;
+			flowEntryIdEvents.add(flowEntryIdEventEntry);
+			continue;
+		    }
+		    */
 		}
 		collection.clear();
 
@@ -236,13 +281,89 @@
 	    log.debug("Exception processing Network Events: ", exception);
 	}
     }
-
+    
     /**
      * Process the events (if any)
      */
     private void processEvents() {
 	Collection<FlowEntry> modifiedFlowEntries;
 
+	if (enableOnrc2014MeasurementsFlows) {
+
+	    if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
+		switchDpidEvents.isEmpty()) {
+		return;		// Nothing to do
+	    }
+
+	    Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+	    // Process the Switch Dpid events
+	    processSwitchDpidEvents();
+
+	    // Process the Flow ID events
+	    processFlowIdEvents(mySwitches);
+
+	    // Fetch the topology
+	    processTopologyEvents();
+
+	    // Recompute all affected Flow Paths and keep only the modified
+	    for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+		if (recomputeFlowPath(flowPath))
+		    modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+	    }
+
+	    // Assign the Flow Entry ID as needed
+	    for (FlowPath flowPath : modifiedFlowPaths.values()) {
+		for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		    if (! flowEntry.isValidFlowEntryId()) {
+			long id = flowManager.getNextFlowEntryId();
+			flowEntry.setFlowEntryId(new FlowEntryId(id));
+		    }
+		}
+	    }
+
+	    //
+	    // Push the modified state to the database
+	    //
+	    for (FlowPath flowPath : modifiedFlowPaths.values()) {
+		//
+		// Delete the Flow Path from the Network Map
+		//
+		if (flowPath.flowPathUserState() ==
+		    FlowPathUserState.FP_USER_DELETE) {
+		    log.debug("Deleting Flow Path From Database: {}", flowPath);
+		    // TODO: For now the deleting of a Flow Path is blocking
+		    ParallelFlowDatabaseOperation.deleteFlow(dbHandler,
+							     flowPath.flowId());
+		    // Send the notifications for the deleted Flow Entries
+		    for (FlowEntry flowEntry : flowPath.flowEntries()) {
+			datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+		    }
+
+		    continue;
+		}
+
+		log.debug("Pushing Flow Path To Database: {}", flowPath);
+		//
+		// Write the Flow Path to the Network Map
+		//
+		ParallelFlowDatabaseOperation.addFlow(dbHandler, flowPath,
+						      datagridService);
+	    }
+
+	    // Cleanup
+	    topologyEvents.clear();
+	    flowIdEvents.clear();
+	    switchDpidEvents.clear();
+	    //
+	    // NOTE: Keep a cache with my Flow Paths
+	    // allFlowPaths.clear();
+	    shouldRecomputeFlowPaths.clear();
+	    modifiedFlowPaths.clear();
+
+	    return;
+	}
+
 	if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
 	    flowEntryEvents.isEmpty()) {
 	    return;		// Nothing to do
@@ -379,6 +500,246 @@
     }
 
     /**
+     * Fix a flow fetched from the database.
+     *
+     * @param flowPath the Flow to fix.
+     */
+    private void fixFlowFromDatabase(FlowPath flowPath) {
+	//
+	// TODO: Bug workaround / fix :
+	// method FlowDatabaseOperation.extractFlowEntry() doesn't
+	// fetch the inPort and outPort, hence we assign them here.
+	//
+	// Assign the inPort and outPort for the Flow Entries
+	for (FlowEntry flowEntry : flowPath.flowEntries()) {
+	    // Set the inPort
+	    do {
+		if (flowEntry.inPort() != null)
+		    break;
+		if (flowEntry.flowEntryMatch() == null)
+		    break;
+		Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
+		flowEntry.setInPort(inPort);
+	    } while (false);
+
+	    // Set the outPort
+	    do {
+		if (flowEntry.outPort() != null)
+		    break;
+		for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+		    if (fa.actionOutput() != null) {
+			Port outPort = new Port(fa.actionOutput().port().value());
+			flowEntry.setOutPort(outPort);
+			break;
+		    }
+		}
+	    } while (false);
+	}
+    }
+
+    /**
+     * Process the Switch Dpid events.
+     */
+    private void processSwitchDpidEvents() {
+	Map<Long, Dpid> addedSwitches = new HashMap<Long, Dpid>();
+	Map<Long, Dpid> removedSwitches = new HashMap<Long, Dpid>();
+
+	//
+	// Process all Switch Dpid events and update the appropriate state
+	//
+	for (EventEntry<Dpid> eventEntry : switchDpidEvents) {
+	    Dpid dpid = eventEntry.eventData();
+			
+	    log.debug("SwitchDpid Event: {} {}", eventEntry.eventType(), dpid);
+
+	    // Compute the final set of added and removed switches
+	    switch (eventEntry.eventType()) {
+	    case ENTRY_ADD:
+		addedSwitches.put(dpid.value(), dpid);
+		removedSwitches.remove(dpid.value());
+		break;
+	    case ENTRY_REMOVE:
+		addedSwitches.remove(dpid.value());
+		removedSwitches.put(dpid.value(), dpid);
+		break;
+	    }
+	}
+
+	//
+	// Remove the Flows from the local cache if the removed
+	// switch is the Source Switch.
+	//
+	// TODO: This search can be expensive for a large number of flows
+	// and should be optmized.
+	//
+	List<FlowId> deleteFlowIds = new LinkedList<FlowId>();
+	for (Dpid switchDpid : removedSwitches.values()) {
+	    for (FlowPath flowPath : allFlowPaths.values()) {
+		Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+		if (srcDpid.value() == switchDpid.value())
+		    deleteFlowIds.add(flowPath.flowId());
+	    }
+	}
+	//
+	// Remove the Flows from the local cache
+	//
+	for (FlowId flowId : deleteFlowIds)
+	    allFlowPaths.remove(flowId.value());
+
+	// Get the Flows for the added switches
+	Collection<FlowPath> flowPaths =
+	    ParallelFlowDatabaseOperation.getFlowsForSwitches(dbHandler,
+							      addedSwitches.values());
+	for (FlowPath flowPath : flowPaths) {
+	    allFlowPaths.put(flowPath.flowId().value(), flowPath);
+	}
+    }
+
+    /**
+     * Process the Flow ID events.
+     *
+     * @param mySwitches the collection of my switches.
+     */
+    private void processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+	List<FlowId> shouldFetchMyFlowIds = new LinkedList<FlowId>();
+
+	//
+	// Process all Flow Id events and update the appropriate state
+	//
+	for (EventEntry<Pair<FlowId, Dpid>> eventEntry : flowIdEvents) {
+	    Pair<FlowId, Dpid> pair = eventEntry.eventData();
+	    FlowId flowId = pair.first;
+	    Dpid dpid = pair.second;
+
+	    log.debug("Flow ID Event: {} {} {}", eventEntry.eventType(),
+		      flowId, dpid);
+
+	    //
+	    // Ignore Flows if the Source Switch is not controlled by this
+	    // instance.
+	    //
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
+
+	    switch (eventEntry.eventType()) {
+	    case ENTRY_ADD: {
+		//
+		// Add a new Flow Path
+		//
+		if (allFlowPaths.get(flowId.value()) != null) {
+		    //
+		    // TODO: What to do if the Flow Path already exists?
+		    // Fow now, we just re-add it.
+		    //
+		}
+		shouldFetchMyFlowIds.add(flowId);
+
+		break;
+	    }
+
+	    case ENTRY_REMOVE: {
+		//
+		// Remove an existing Flow Path.
+		//
+		// Find the Flow Path, and mark the Flow and its Flow Entries
+		// for deletion.
+		//
+		FlowPath existingFlowPath =
+		    allFlowPaths.get(flowId.value());
+		if (existingFlowPath == null)
+		    continue;		// Nothing to do
+
+		existingFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_DELETE);
+		for (FlowEntry flowEntry : existingFlowPath.flowEntries()) {
+		    flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+		    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+		}
+
+		// Remove the Flow Path from the internal state
+		Long key = existingFlowPath.flowId().value();
+		allFlowPaths.remove(key);
+		shouldRecomputeFlowPaths.remove(key);
+		modifiedFlowPaths.put(key, existingFlowPath);
+
+		break;
+	    }
+	    }
+	}
+
+	// Get my Flows
+	Collection<FlowPath> myFlows =
+	    ParallelFlowDatabaseOperation.getFlows(dbHandler,
+						   shouldFetchMyFlowIds);
+
+	for (FlowPath flowPath : myFlows) {
+	    fixFlowFromDatabase(flowPath);
+
+	    switch (flowPath.flowPathType()) {
+	    case FP_TYPE_SHORTEST_PATH:
+		//
+		// Reset the Data Path, in case it was set already, because
+		// we are going to recompute it anyway.
+		//
+		flowPath.flowEntries().clear();
+		shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+					     flowPath);
+		break;
+	    case FP_TYPE_EXPLICIT_PATH:
+		//
+		// Mark all Flow Entries for installation in the switches.
+		//
+		for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+		}
+		modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+		break;
+	    case FP_TYPE_UNKNOWN:
+		log.error("FlowPath event with unknown type");
+		break;
+	    }
+	    allFlowPaths.put(flowPath.flowId().value(), flowPath);
+	}
+    }
+
+    /**
+     * Process the Flow Entry ID events.
+     *
+     * @param mySwitches the collection of my switches.
+     * @return a collection of modified Flow Entries this instance needs
+     * to push to its own switches.
+     */
+    private Collection<FlowEntry> processFlowEntryIdEvents(Map<Long, IOFSwitch> mySwitches) {
+	List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+	//
+	// Process all Flow ID events and update the appropriate state
+	//
+	for (EventEntry<Pair<FlowEntryId, Dpid>> eventEntry : flowEntryIdEvents) {
+	    Pair<FlowEntryId, Dpid> pair = eventEntry.eventData();
+	    FlowEntryId flowEntryId = pair.first;
+	    Dpid dpid = pair.second;
+
+	    log.debug("Flow Entry ID Event: {} {} {}", eventEntry.eventType(),
+		      flowEntryId, dpid);
+
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
+
+	    // Fetch the Flow Entry
+	    FlowEntry flowEntry = FlowDatabaseOperation.getFlowEntry(dbHandler,
+								     flowEntryId);
+	    if (flowEntry == null) {
+		log.debug("Flow Entry ID {} : Flow Entry not found!",
+			  flowEntryId);
+		continue;
+	    }
+	    modifiedFlowEntries.add(flowEntry);
+	}
+
+	return modifiedFlowEntries;
+    }
+
+    /**
      * Process the Flow Path events.
      */
     private void processFlowPathEvents() {
@@ -464,15 +825,34 @@
      * Process the Topology events.
      */
     private void processTopologyEvents() {
+	boolean isTopologyModified = false;
+
+	if (enableOnrc2014MeasurementsTopology) {
+	    if (topologyEvents.isEmpty())
+		return;
+
+	    // TODO: Code for debugging purpose only
+	    for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
+		TopologyElement topologyElement = eventEntry.eventData();
+		log.debug("Topology Event: {} {}", eventEntry.eventType(),
+			  topologyElement.toString());
+	    }
+
+	    log.debug("[BEFORE] {}", topology.toString());
+	    topology.readFromDatabase(dbHandler);
+	    log.debug("[AFTER] {}", topology.toString());
+	    shouldRecomputeFlowPaths.putAll(allFlowPaths);
+	    return;
+	}
+
 	//
 	// Process all Topology events and update the appropriate state
 	//
-	boolean isTopologyModified = false;
 	for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
 	    TopologyElement topologyElement = eventEntry.eventData();
-
+			
 	    log.debug("Topology Event: {} {}", eventEntry.eventType(),
-		      topologyElement);
+		      topologyElement.toString());
 
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD:
@@ -752,6 +1132,19 @@
     private boolean recomputeFlowPath(FlowPath flowPath) {
 	boolean hasChanged = false;
 
+	if (enableOnrc2014MeasurementsFlows) {
+	    // Cleanup the deleted Flow Entries from the earlier iteration
+	    flowPath.dataPath().removeDeletedFlowEntries();
+
+	    //
+	    // TODO: Fake it that the Flow Entries have been already pushed
+	    // into the switches, so we don't push them again.
+	    //
+	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+	    }
+	}
+
 	//
 	// Test whether the Flow Path needs to be recomputed
 	//
@@ -772,7 +1165,6 @@
 	newDataPath = TopologyManager.computeNetworkPath(topology,
 								  flowPath);
 	}
-
 	if (newDataPath == null) {
 	    // We need the DataPath to compare the paths
 	    newDataPath = new DataPath();
@@ -982,6 +1374,13 @@
      */
     @Override
     public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
+	if (enableOnrc2014MeasurementsFlows) {
+	    Collection entries = new ArrayList();
+	    entries.add(flowEntry);
+	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
+	    return;
+	}
+
 	EventEntry<FlowEntry> eventEntry =
 	    new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
 	networkEvents.add(eventEntry);
@@ -994,6 +1393,19 @@
      */
     @Override
     public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
+	if (enableOnrc2014MeasurementsFlows) {
+	    //
+	    // NOTE: Must update the state to DELETE, because
+	    // the notification contains the original state.
+	    //
+	    flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+
+	    Collection entries = new ArrayList();
+	    entries.add(flowEntry);
+	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
+	    return;
+	}
+
 	EventEntry<FlowEntry> eventEntry =
 	    new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_REMOVE, flowEntry);
 	networkEvents.add(eventEntry);
@@ -1006,6 +1418,13 @@
      */
     @Override
     public void notificationRecvFlowEntryUpdated(FlowEntry flowEntry) {
+	if (enableOnrc2014MeasurementsFlows) {
+	    Collection entries = new ArrayList();
+	    entries.add(flowEntry);
+	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
+	    return;
+	}
+
 	// NOTE: The ADD and UPDATE events are processed in same way
 	EventEntry<FlowEntry> eventEntry =
 	    new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
@@ -1013,6 +1432,101 @@
     }
 
     /**
+     * Receive a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    @Override
+    public void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    @Override
+    public void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    @Override
+    public void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	// NOTE: The ADD and UPDATE events are processed in same way
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowEntryId is added.
+     *
+     * @param flowEntryId the FlowEntryId that is added.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    @Override
+    public void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId,
+						 Dpid dpid) {
+	Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+	EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowEntryId is removed.
+     *
+     * @param flowEntryId the FlowEntryId that is removed.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    @Override
+    public void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+						   Dpid dpid) {
+	Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+	EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowEntryIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowEntryId is updated.
+     *
+     * @param flowEntryId the FlowEntryId that is updated.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    @Override
+    public void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+						   Dpid dpid) {
+	Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+	// NOTE: The ADD and UPDATE events are processed in same way
+	EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
      * Receive a notification that a Topology Element is added.
      *
      * @param topologyElement the Topology Element that is added.
@@ -1050,6 +1564,40 @@
     }
 
     /**
+     * Receive a notification that a switch is added to this instance.
+     *
+     * @param sw the switch that is added.
+     */
+    @Override
+    public void addedSwitch(IOFSwitch sw) {
+	Dpid dpid = new Dpid(sw.getId());
+	EventEntry<Dpid> eventEntry =
+	    new EventEntry<Dpid>(EventEntry.Type.ENTRY_ADD, dpid);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a switch is removed from this instance.
+     *
+     * @param sw the switch that is removed.
+     */
+    @Override
+    public void removedSwitch(IOFSwitch sw) {
+	Dpid dpid = new Dpid(sw.getId());
+	EventEntry<Dpid> eventEntry =
+	    new EventEntry<Dpid>(EventEntry.Type.ENTRY_REMOVE, dpid);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that the ports on a switch have changed.
+     */
+    @Override
+    public void switchPortChanged(Long switchId) {
+	// Nothing to do
+    }
+    
+    /**
      * Get a sorted copy of all Flow Paths.
      *
      * @return a sorted copy of all Flow Paths.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 611b5c9..fc5ae34 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -8,6 +8,7 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -50,8 +51,9 @@
  * Flow Manager class for handling the network flows.
  */
 public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
-    // flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
-    private final static boolean enableFlowPusher = false;
+
+    private boolean enableOnrc2014MeasurementsFlows = true;
+
     protected DBOperation dbHandlerApi;
     protected DBOperation dbHandlerInner;
 
@@ -98,7 +100,7 @@
      * Shutdown the Flow Manager operation.
      */
     @Override
-    public void finalize() {
+    protected void finalize() {
     	close();
     }
 
@@ -107,6 +109,7 @@
      */
     @Override
     public void close() {
+	floodlightProvider.removeOFSwitchListener(flowEventHandler);
 	datagridService.deregisterFlowEventHandlerService(flowEventHandler);
     	dbHandlerApi.close();
     	dbHandlerInner.close();
@@ -233,6 +236,7 @@
 	//  - startup
 	//
 	flowEventHandler = new FlowEventHandler(this, datagridService);
+	floodlightProvider.addOFSwitchListener(flowEventHandler);
 	datagridService.registerFlowEventHandlerService(flowEventHandler);
 	flowEventHandler.start();
     }
@@ -273,7 +277,13 @@
 	}
 
 	if (FlowDatabaseOperation.addFlow(dbHandlerApi, flowPath)) {
-	    datagridService.notificationSendFlowAdded(flowPath);
+	    if (enableOnrc2014MeasurementsFlows) {
+		datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
+							    flowPath.dataPath().srcPort().dpid());
+	    } else {
+		datagridService.notificationSendFlowAdded(flowPath);
+	    }
+
 	    return flowPath.flowId();
 	}
 	return null;
@@ -287,7 +297,11 @@
     @Override
     public boolean deleteAllFlows() {
 	if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
-	    datagridService.notificationSendAllFlowsRemoved();
+	    if (enableOnrc2014MeasurementsFlows) {
+		datagridService.notificationSendAllFlowIdsRemoved();
+	    } else {
+		datagridService.notificationSendAllFlowsRemoved();
+	    }
 	    return true;
 	}
 	return false;
@@ -302,7 +316,11 @@
     @Override
     public boolean deleteFlow(FlowId flowId) {
 	if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
-	    datagridService.notificationSendFlowRemoved(flowId);
+	    if (enableOnrc2014MeasurementsFlows) {
+		datagridService.notificationSendFlowIdRemoved(flowId);
+	    } else {
+		datagridService.notificationSendFlowRemoved(flowId);
+	    }
 	    return true;
 	}
 	return false;
@@ -320,6 +338,26 @@
     }
 
     /**
+     * Get a previously added flow entry.
+     *
+     * @param flowEntryId the Flow Entry ID of the flow entry to get.
+     * @return the Flow Entry if found, otherwise null.
+     */
+    public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
+	return FlowDatabaseOperation.getFlowEntry(dbHandlerApi, flowEntryId);
+    }
+
+    /**
+     * Get the source switch DPID of a previously added flow.
+     *
+     * @param flowId the Flow ID of the flow to get.
+     * @return the source switch DPID if found, otherwise null.
+     */
+    public Dpid getFlowSourceDpid(FlowId flowId) {
+	return FlowDatabaseOperation.getFlowSourceDpid(dbHandlerApi, flowId);
+    }
+
+    /**
      * Get all installed flows by all installers.
      *
      * @return the Flow Paths if found, otherwise null.
@@ -330,6 +368,18 @@
     }
 
     /**
+     * Get all installed flows whose Source Switch is controlled by this
+     * instance.
+     *
+     * @param mySwitches the collection of the switches controlled by this
+     * instance.
+     * @return the Flow Paths if found, otherwise null.
+     */
+    public ArrayList<FlowPath> getAllMyFlows(Map<Long, IOFSwitch> mySwitches) {
+	return FlowDatabaseOperation.getAllMyFlows(dbHandlerApi, mySwitches);
+    }
+
+    /**
      * Get summary of all installed flows by all installers in a given range.
      *
      * @param flowId the Flow ID of the first flow in the flow range to get.
@@ -341,7 +391,17 @@
 						  int maxFlows) {
     	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
 	SortedMap<Long, FlowPath> sortedFlowPaths =
-	    flowEventHandler.getAllFlowPathsCopy();
+	    new TreeMap<Long, FlowPath>();
+
+	if (enableOnrc2014MeasurementsFlows) {
+	    Collection<FlowPath> databaseFlowPaths =
+		ParallelFlowDatabaseOperation.getAllFlows(dbHandlerApi);
+	    for (FlowPath flowPath : databaseFlowPaths) {
+		sortedFlowPaths.put(flowPath.flowId().value(), flowPath);
+	    }
+	} else {
+	    sortedFlowPaths = flowEventHandler.getAllFlowPathsCopy();
+	}
 
 	//
 	// Truncate each Flow Path and Flow Entry
@@ -421,6 +481,9 @@
     public void flowEntriesPushedToSwitch(
 		Collection<Pair<IOFSwitch, FlowEntry>> entries) {
 
+	if (enableOnrc2014MeasurementsFlows)
+	    return;
+
 	//
 	// Process all entries
 	//
@@ -490,8 +553,12 @@
 	//  - Flow Paths to the database
 	//
 	pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
-	pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
-	cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+	if (enableOnrc2014MeasurementsFlows) {
+	    writeModifiedFlowPathsToDatabase(modifiedFlowPaths);
+	} else {
+	    pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
+	    cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+	}
     }
 
     /**
@@ -502,7 +569,7 @@
      *
      * @param modifiedFlowEntries the collection of modified Flow Entries.
      */
-    private void pushModifiedFlowEntriesToSwitches(
+    void pushModifiedFlowEntriesToSwitches(
 			Collection<FlowEntry> modifiedFlowEntries) {
 	if (modifiedFlowEntries.isEmpty())
 	    return;
@@ -679,7 +746,7 @@
      *
      * @param modifiedFlowPaths the collection of Flow Paths to write.
      */
-    private void writeModifiedFlowPathsToDatabase(
+    void writeModifiedFlowPathsToDatabase(
 		Collection<FlowPath> modifiedFlowPaths) {
 	if (modifiedFlowPaths.isEmpty())
 	    return;
@@ -719,7 +786,7 @@
 			retry = true;
 		    } catch (Exception e) {
 			log.error("Exception deleting Flow Path from Network MAP: {}", e);
-		    } 
+		    }
 		} while (retry);
 
 		continue;
@@ -738,10 +805,12 @@
 		    allValid = false;
 		    break;
 		}
-		if (flowEntry.flowEntrySwitchState() !=
-		    FlowEntrySwitchState.FE_SWITCH_UPDATED) {
-		    allValid = false;
-		    break;
+		if (! enableOnrc2014MeasurementsFlows) {
+		    if (flowEntry.flowEntrySwitchState() !=
+			FlowEntrySwitchState.FE_SWITCH_UPDATED) {
+			allValid = false;
+			break;
+		    }
 		}
 	    }
 	    if (! allValid)
@@ -777,6 +846,36 @@
                     //log.error("Performance %% Flow path total time {} : {}", endTime - startTime, flowPath.toString());
 		}
 	    } while (retry);
+
+	    if (enableOnrc2014MeasurementsFlows) {
+		// Send the notifications
+
+		for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		    if (flowEntry.flowEntrySwitchState() !=
+			FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+			continue;
+		    }
+		    // datagridService.notificationSendFlowEntryIdAdded(flowEntry.flowEntryId(), flowEntry.dpid());
+
+		    //
+		    // Write the Flow Entry to the Datagrid
+		    //
+		    switch (flowEntry.flowEntryUserState()) {
+		    case FE_USER_ADD:
+			datagridService.notificationSendFlowEntryAdded(flowEntry);
+			break;
+		    case FE_USER_MODIFY:
+			datagridService.notificationSendFlowEntryUpdated(flowEntry);
+			break;
+		    case FE_USER_DELETE:
+			datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+			break;
+		    case FE_USER_UNKNOWN:
+			assert(false);
+			break;
+		    }
+		}
+	    }
 	}
     }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..a44a898 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -1,7 +1,10 @@
 package net.onrc.onos.ofcontroller.flowmanager;
 
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
 /**
@@ -51,6 +54,56 @@
     void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
 
     /**
+     * Receive a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid);
+
+    /**
+     * Receive a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid);
+
+    /**
+     * Receive a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid);
+
+    /**
+     * Receive a notification that a FlowEntryId is added.
+     *
+     * @param flowEntryId the FlowEntryId that is added.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+    /**
+     * Receive a notification that a FlowEntryId is removed.
+     *
+     * @param flowEntryId the FlowEntryId that is removed.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+					    Dpid dpid);
+
+    /**
+     * Receive a notification that a FlowEntryId is updated.
+     *
+     * @param flowEntryId the FlowEntryId that is updated.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+					    Dpid dpid);
+
+    /**
      * Receive a notification that a Topology Element is added.
      *
      * @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
new file mode 100644
index 0000000..212c59f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
@@ -0,0 +1,367 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+/**
+ * Class for performing parallel Flow-related operations on the Database.
+ * 
+ * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
+ * for parallelization.
+ * 
+ * @author Brian O'Connor <brian@onlab.us>
+ */
+public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
+    private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
+
+    private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
+    private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+
+    /**
+     * Get all installed flows by first querying the database for all FlowPaths
+     * and then populating them from the database in parallel.
+     * 
+     * @param dbHandler the Graph Database handler to use.
+     * @return the Flow Paths if found, otherwise an empty list.
+     */
+    static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler) {
+	Iterable<IFlowPath> flowPathsObj = null;
+	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+	try {
+	    flowPathsObj = dbHandler.getAllFlowPaths();
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getAllFlowPaths failed");
+	    return flowPaths;
+	}
+	if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+	    dbHandler.commit();
+	    return flowPaths;	// No Flows found
+	}
+	
+	CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+	int numTasks = 0;
+	for(IFlowPath flowObj : flowPathsObj) {
+	    tasks.submit(new ExtractFlowTask(flowObj));
+	    numTasks++;
+	}
+	for(int i = 0; i < numTasks; i++) {
+	    try {
+		FlowPath flowPath = tasks.take().get();
+		if(flowPath != null) {
+		    flowPaths.add(flowPath);
+		}
+	    } catch (InterruptedException | ExecutionException e) {
+		log.error("Error reading FlowPath from IFlowPath object");
+	    }
+	}
+	dbHandler.commit();
+	return flowPaths;	
+    }
+    
+    /**
+     * Query the database for all flow paths that have their source switch
+     * in the provided collection
+     * 
+     * Note: this function is implemented naively and inefficiently
+     * 
+     * @param dbHandler the Graph Database handler to use.
+     * @param switches a collection of switches whose flow paths you want
+     * @return the Flow Paths if found, otherwise an empty list.
+     */
+    static ArrayList<FlowPath> getFlowsForSwitches(DBOperation dbHandler, Collection<Dpid> switches) {
+	Iterable<IFlowPath> flowPathsObj = null;
+	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+	try {
+	    flowPathsObj = dbHandler.getAllFlowPaths();
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getAllFlowPaths failed");
+	    return flowPaths;
+	}
+	if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+	    dbHandler.commit();
+	    return flowPaths;	// No Flows found
+	}
+	
+	// convert the collection of switch dpids into a set of strings
+	Set<String> switchSet = new HashSet<>();
+	for(Dpid dpid : switches) {
+	    switchSet.add(dpid.toString());
+	}
+	
+	CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+	int numTasks = 0;
+	for(IFlowPath flowObj : flowPathsObj) {
+	    if(switchSet.contains(flowObj.getSrcSwitch())) {
+		tasks.submit(new ExtractFlowTask(flowObj));
+		numTasks++;
+	    }
+	}
+	for(int i = 0; i < numTasks; i++) {
+	    try {
+		FlowPath flowPath = tasks.take().get();
+		if(flowPath != null) {
+		    flowPaths.add(flowPath);
+		}
+	    } catch (InterruptedException | ExecutionException e) {
+		log.error("Error reading FlowPath from IFlowPath object");
+	    }
+	}
+	dbHandler.commit();
+	return flowPaths;	
+    }
+    
+    /**
+     * The basic parallelization unit for extracting FlowEntries from the database.
+     * 
+     * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
+     */
+    private final static class ExtractFlowTask implements Callable<FlowPath> {
+	private final IFlowPath flowObj;
+	
+	ExtractFlowTask(IFlowPath flowObj){
+	    this.flowObj = flowObj;
+	}
+	@Override 
+	public FlowPath call() throws Exception {
+	    return extractFlowPath(flowObj);
+	}
+    }
+
+    /**
+     * Get a subset of installed flows in parallel.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowIds the collection of Flow IDs to get.
+     * @return the Flow Paths if found, otherwise an empty list.
+     */
+    static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
+		  			Collection<FlowId> flowIds) {
+	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+	CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+	int numTasks = 0;
+	for (FlowId flowId : flowIds) {
+	    tasks.submit(new GetFlowTask(dbHandler, flowId));
+	    numTasks++;
+	}
+	for(int i = 0; i < numTasks; i++) {
+	    try {
+		FlowPath flowPath = tasks.take().get();
+		if(flowPath != null) {
+		    flowPaths.add(flowPath);
+		}
+	    } catch (InterruptedException | ExecutionException e) {
+		log.error("Error reading FlowPath from database");
+	    }
+	}
+	// TODO: should we commit?
+	//dbHandler.commit();
+	return flowPaths;
+    }
+    
+    /**
+     * The basic parallelization unit for getting FlowEntries.
+     * 
+     * This is simply a wrapper for FlowDatabaseOperation.getFlow()
+     */
+    private final static class GetFlowTask implements Callable<FlowPath> {
+	private final DBOperation dbHandler;
+	private final FlowId flowId;
+
+	GetFlowTask(DBOperation dbHandler, FlowId flowId) {
+	    this.dbHandler = dbHandler;
+	    this.flowId = flowId;
+	}
+	@Override
+	public FlowPath call() throws Exception{
+	    return getFlow(dbHandler, flowId);
+	}
+    }
+    
+    /**
+     * Add a flow by creating a database task, then waiting for the result.
+     * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
+     * performance benefit.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowPath the Flow Path to install.
+     * @return true on success, otherwise false.
+     */
+    static boolean addFlow(DBOperation dbHandler, FlowPath flowPath) {
+	Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
+	// NOTE: This function is blocking
+	try {
+	    return result.get();
+	} catch (InterruptedException | ExecutionException e) {
+	    return false;
+	}
+    }
+    
+    /**
+     * Add a flow asynchronously by creating a database task.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowPath the Flow Path to install.
+     * @param datagridService the notification service for when the task is completed
+     * @return true always
+     */
+    static boolean addFlow(DBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
+	executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
+	// TODO: If we need the results, submit returns a Future that contains
+	// the result. 
+	return true;
+
+    }
+    
+    /**
+     * The basic parallelization unit for adding FlowPaths.
+     * 
+     * This is simply a wrapper for FlowDatabaseOperation.addFlow(), 
+     * which also sends a notification if a datagrid services is provided
+     */
+    private final static class AddFlowTask implements Callable<Boolean> {
+	private final DBOperation dbHandler;
+	private final FlowPath flowPath;
+	private final IDatagridService datagridService;
+
+	AddFlowTask(DBOperation dbHandler,
+		    FlowPath flowPath,
+		    IDatagridService datagridService) {
+	    this.dbHandler = dbHandler;
+	    this.flowPath = flowPath;
+	    this.datagridService = datagridService;
+	}
+	
+	@Override
+	public Boolean call() throws Exception {
+	    boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
+	    if(success) {
+		if(datagridService != null) {
+		    // Send notifications for each Flow Entry
+		    for (FlowEntry flowEntry : flowPath.flowEntries()) {
+			if (flowEntry.flowEntrySwitchState() !=
+			    FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+			    continue;
+			}
+			//
+			// Write the Flow Entry to the Datagrid
+			//
+			switch (flowEntry.flowEntryUserState()) {
+			case FE_USER_ADD:
+			    datagridService.notificationSendFlowEntryAdded(flowEntry);
+			    break;
+			case FE_USER_MODIFY:
+			    datagridService.notificationSendFlowEntryUpdated(flowEntry);
+			    break;
+			case FE_USER_DELETE:
+			    datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+			    break;
+			case FE_USER_UNKNOWN:
+			    assert(false);
+			    break;
+			}
+		    }
+		}
+	    }
+	    else {
+		log.error("Error adding flow path {} to database", flowPath);
+	    }
+	    return success;
+
+	}
+    }
+
+    /**
+     * Delete a previously added flow by creating a database task, then waiting 
+     * for the result.
+     * 
+     * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
+     * performance benefit.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowId the Flow ID of the flow to delete.
+     * @return true on success, otherwise false.
+     */
+    static boolean deleteFlow(DBOperation dbHandler, FlowId flowId) {
+	Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
+	// NOTE: This function is blocking
+	try {
+	    return result.get();
+	} catch (InterruptedException | ExecutionException e) {
+	    return false;
+	}
+    }    
+    
+    /**
+     * Delete a previously added flow asynchronously by creating a database task.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowId the Flow ID of the flow to delete.
+     * @param datagridService the notification service for when the task is completed
+     * @return true always
+     */
+    static boolean deleteFlow(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+	executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
+	// TODO: If we need the results, submit returns a Future that contains
+	// the result. 
+	return true;
+    }
+    
+    /**
+     * The basic parallelization unit for deleting FlowPaths.
+     * 
+     * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
+     * which also sends a notification if a datagrid services is provided
+     */
+    private final static class DeleteFlowTask implements Callable<Boolean> {
+	private final DBOperation dbHandler;
+	private final FlowId flowId;
+	private final IDatagridService datagridService;
+
+	DeleteFlowTask(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+	    this.dbHandler = dbHandler;
+	    this.flowId = flowId;
+	    this.datagridService = datagridService;
+	}
+	@Override
+	public Boolean call() throws Exception{
+	    boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+	    if(success) {
+		if(datagridService != null) {
+		    datagridService.notificationSendFlowIdRemoved(flowId);
+		}
+	    }
+	    else {
+		log.error("Error removing flow path {} from database", flowId);
+	    }
+	    return success;
+	}
+    }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
new file mode 100644
index 0000000..13319e7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
@@ -0,0 +1,162 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for collecting performance measurements
+ */
+public class PerformanceMonitor {
+    private final static Map<String, Measurement> map = new ConcurrentHashMap<String, Measurement>();;
+    private final static Logger log = LoggerFactory.getLogger(PerformanceMonitor.class);
+    private static long overhead;    
+    
+    /**
+     * Start a performance measurement, identified by a tag
+     * 
+     * Note: Only a single measurement can use the same tag at a time.
+     * 
+     * @param tag for performance measurement
+     */
+    public static void start(String tag) {
+	long start = System.nanoTime();
+	Measurement m = new Measurement();
+	if(map.put(tag, m) != null) {
+	    // if there was a previous entry, we have just overwritten it
+	    log.error("Tag {} already exists", tag);
+	}
+	m.start();
+	overhead += System.nanoTime() - start;
+    }
+    
+    /**
+     * Stop a performance measurement. 
+     * 
+     * You must have already started a measurement with tag.
+     * 
+     * @param tag for performance measurement
+     */
+    public static void stop(String tag) {
+	long time = System.nanoTime();
+	Measurement m = map.get(tag);
+	if(m == null) {
+	    log.error("Tag {} does not exist", tag);
+	}
+	else {
+	    map.get(tag).stop(time);
+	}
+	overhead += System.nanoTime() - time;
+    }
+    
+    /**
+     * Find a measurement, identified by tag, and return the result
+     * 
+     * @param tag for performance measurement
+     * @return the time in nanoseconds
+     */
+    public static long result(String tag) {
+	Measurement m = map.get(tag);
+	if(m != null) {
+	    return m.elapsed();
+	}
+	else {
+	    return -1;
+	}
+    }
+    
+    /**
+     * Clear all performance measurements.
+     */
+    public static void clear() {
+	map.clear();
+	overhead = 0;
+    }
+    
+    /**
+     * Write all performance measurements to the log
+     */
+    public static void report() {
+	double overheadMilli = overhead / Math.pow(10, 6);
+	log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
+    }
+    
+    /**
+     * A single performance measurement
+     */
+    static class Measurement {
+	long start;
+	long stop;
+	
+	/** 
+	 * Start the measurement
+	 */
+	public void start() {
+	    start = System.nanoTime();
+	}
+	
+	/**
+	 * Stop the measurement
+	 */
+	public void stop() {
+	    stop = System.nanoTime();
+	}
+	
+	/**
+	 * Stop the measurement at a specific time
+	 * @param time to stop
+	 */
+	public void stop(long time){
+	    stop = time;
+	}
+	
+	/**
+	 * Compute the elapsed time of the measurement in nanoseconds
+	 * 
+	 * @return the measurement time in nanoseconds, or -1 if the measurement is stil running.
+	 */
+	public long elapsed() {
+	    if(stop == 0) {
+		return -1;
+	    }
+	    else {
+		return stop - start;
+	    }
+	}
+	
+	/**
+	 * Returns the number of milliseconds for the measurement as a String.
+	 */
+	public String toString() {
+	    double milli = elapsed() / Math.pow(10, 6);
+	    return Double.toString(milli) + "ms";
+	}
+    }
+    
+    public static void main(String args[]){
+	// test the measurement overhead
+	String tag;
+	for(int i = 0; i < 100; i++){
+	    tag = "foo foo foo";
+	    start(tag); stop(tag);
+	    tag = "bar";
+	    start(tag); stop(tag);
+	    tag = "baz";
+	    start(tag); stop(tag);
+	    report();
+	    clear();
+	}
+	for(int i = 0; i < 100; i++){
+	    tag = "a";
+	    start(tag); stop(tag);
+	    tag = "b";
+	    start(tag); stop(tag);
+	    tag = "c";
+	    start(tag); stop(tag);
+	    report();
+	    clear();
+	}
+    }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 98fe262..949cc7b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -242,12 +242,15 @@
      *
      */
     class FlowEntryWrapper {
-	FlowEntryId flowEntryId;
-	OFFlowStatisticsReply statisticsReply;
+    FlowEntryId flowEntryId;
+    IFlowEntry iFlowEntry;
+    OFFlowStatisticsReply statisticsReply;
+
 
 	public FlowEntryWrapper(IFlowEntry entry) {
 	    flowEntryId = new FlowEntryId(entry.getFlowEntryId());
-	}
+	    iFlowEntry = entry;
+    }
 
 	public FlowEntryWrapper(OFFlowStatisticsReply entry) {
 	    flowEntryId = new FlowEntryId(entry.getCookie());
@@ -268,18 +271,14 @@
 
 	    double startDB = System.nanoTime();
 	    // Get the Flow Entry state from the Network Graph
-	    IFlowEntry iFlowEntry = null;
-	    try {
-		iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
-	    } catch (Exception e) {
-		log.error("Error finding flow entry {} in Network Graph",
-			  flowEntryId);
-		return;
-	    }
 	    if (iFlowEntry == null) {
-		log.error("Cannot add flow entry {} to sw {} : flow entry not found",
-			  flowEntryId, sw.getId());
-		return;
+            try {
+            	iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
+            } catch (Exception e) {
+            	log.error("Error finding flow entry {} in Network Graph",
+            			flowEntryId);
+            	return;
+            }
 	    }
 	    dbTime = System.nanoTime() - startDB;
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
index 1dbfdcb..3860e05 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
@@ -112,7 +112,7 @@
  */
 @LogMessageCategory("Network Topology")
 public class LinkDiscoveryManager
-implements IOFMessageListener, IOFSwitchListener, 
+implements IOFMessageListener, IOFSwitchListener,
 ILinkDiscoveryService, IFloodlightModule {
 	protected IFloodlightProviderService controller;
     protected final static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
@@ -125,7 +125,7 @@
 
 
     // LLDP and BDDP fields
-    private static final byte[] LLDP_STANDARD_DST_MAC_STRING = 
+    private static final byte[] LLDP_STANDARD_DST_MAC_STRING =
             HexString.fromHexString("01:80:c2:00:00:0e");
     private static final long LINK_LOCAL_MASK  = 0xfffffffffff0L;
     private static final long LINK_LOCAL_VALUE = 0x0180c2000000L;
@@ -135,27 +135,27 @@
     private static final String LLDP_BSN_DST_MAC_STRING = "ff:ff:ff:ff:ff:ff";
 
 
-    // Direction TLVs are used to indicate if the LLDPs were sent 
+    // Direction TLVs are used to indicate if the LLDPs were sent
     // periodically or in response to a recieved LLDP
     private static final byte TLV_DIRECTION_TYPE = 0x73;
     private static final short TLV_DIRECTION_LENGTH = 1;  // 1 byte
     private static final byte TLV_DIRECTION_VALUE_FORWARD[] = {0x01};
     private static final byte TLV_DIRECTION_VALUE_REVERSE[] = {0x02};
-    private static final LLDPTLV forwardTLV 
+    private static final LLDPTLV forwardTLV
     = new LLDPTLV().
-    setType((byte)TLV_DIRECTION_TYPE).
-    setLength((short)TLV_DIRECTION_LENGTH).
+    setType(TLV_DIRECTION_TYPE).
+    setLength(TLV_DIRECTION_LENGTH).
     setValue(TLV_DIRECTION_VALUE_FORWARD);
 
-    private static final LLDPTLV reverseTLV 
+    private static final LLDPTLV reverseTLV
     = new LLDPTLV().
-    setType((byte)TLV_DIRECTION_TYPE).
-    setLength((short)TLV_DIRECTION_LENGTH).
+    setType(TLV_DIRECTION_TYPE).
+    setLength(TLV_DIRECTION_LENGTH).
     setValue(TLV_DIRECTION_VALUE_REVERSE);
 
     // Link discovery task details.
     protected SingletonTask discoveryTask;
-    protected final int DISCOVERY_TASK_INTERVAL = 1; 
+    protected final int DISCOVERY_TASK_INTERVAL = 1;
     protected final int LINK_TIMEOUT = 35; // original 35 secs, aggressive 5 secs
     protected final int LLDP_TO_ALL_INTERVAL = 15 ; //original 15 seconds, aggressive 2 secs.
     protected long lldpClock = 0;
@@ -206,7 +206,7 @@
     /* topology aware components are called in the order they were added to the
      * the array */
     protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
-    
+
     protected class LinkUpdate extends LDUpdate {
 
 		public LinkUpdate(LDUpdate old) {
@@ -263,7 +263,7 @@
      */
     protected Map<NodePortTuple, Long> broadcastDomainPortTimeMap;
 
-    /** 
+    /**
      * Get the LLDP sending period in seconds.
      * @return LLDP sending period in seconds.
      */
@@ -283,6 +283,7 @@
         return portLinks;
     }
 
+    @Override
     public Set<NodePortTuple> getSuppressLLDPsInfo() {
         return suppressLinkDiscovery;
     }
@@ -291,6 +292,7 @@
      * Add a switch port to the suppressed LLDP list.
      * Remove any known links on the switch port.
      */
+    @Override
     public void AddToSuppressLLDPs(long sw, short port)
     {
         NodePortTuple npt = new NodePortTuple(sw, port);
@@ -302,7 +304,8 @@
      * Remove a switch port from the suppressed LLDP list.
      * Discover links on that switchport.
      */
-    public void RemoveFromSuppressLLDPs(long sw, short port) 
+    @Override
+    public void RemoveFromSuppressLLDPs(long sw, short port)
     {
         NodePortTuple npt = new NodePortTuple(sw, port);
         this.suppressLinkDiscovery.remove(npt);
@@ -317,6 +320,7 @@
         return false;
     }
 
+    @Override
     public ILinkDiscovery.LinkType getLinkType(Link lt, LinkInfo info) {
         if (info.getUnicastValidTime() != null) {
             return ILinkDiscovery.LinkType.DIRECT_LINK;
@@ -326,7 +330,7 @@
         return ILinkDiscovery.LinkType.INVALID_LINK;
     }
 
-    
+
     private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
         return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
     }
@@ -437,6 +441,7 @@
         }
     }
 
+    @Override
     public Set<Short> getQuarantinedPorts(long sw) {
         Set<Short> qPorts = new HashSet<Short>();
 
@@ -468,12 +473,12 @@
         else operation = UpdateOperation.PORT_DOWN;
 
         LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
-        
-        
+
+
         controller.publishUpdate(update);
     }
 
-    /** 
+    /**
      * Send LLDP on known ports
      */
     protected void discoverOnKnownLinkPorts() {
@@ -500,7 +505,7 @@
      */
     protected IOFSwitch addRemoteSwitch(long sw, short port) {
     	IOnosRemoteSwitch remotesw = null;
-    	
+
     	// add a switch if we have not seen it before
     	remotesw = remoteSwitches.get(sw);
 
@@ -510,26 +515,26 @@
         	remoteSwitches.put(remotesw.getId(), remotesw);
         	log.debug("addRemoteSwitch(): added fake remote sw {}", remotesw);
         }
-        
+
         // add the port if we have not seen it before
         if (remotesw.getPort(port) == null) {
         	OFPhysicalPort remoteport = new OFPhysicalPort();
         	remoteport.setPortNumber(port);
         	remoteport.setName("fake_" + port);
-        	remoteport.setConfig(0); 
+        	remoteport.setConfig(0);
         	remoteport.setState(0);
         	remotesw.setPort(remoteport);
         	log.debug("addRemoteSwitch(): added fake remote port {} to sw {}", remoteport, remotesw.getId());
         }
-        
+
         return remotesw;
     }
-    
+
     /**
      * Send link discovery message out of a given switch port.
      * The discovery message may be a standard LLDP or a modified
-     * LLDP, where the dst mac address is set to :ff.  
-     * 
+     * LLDP, where the dst mac address is set to :ff.
+     *
      * TODO: The modified LLDP will updated in the future and may
      * use a different eth-type.
      * @param sw
@@ -565,7 +570,7 @@
 
         if (isLinkDiscoverySuppressed(sw, port)) {
             /* Dont send LLDPs out of this port as suppressLLDPs set
-             * 
+             *
              */
             return;
         }
@@ -881,9 +886,9 @@
 
         addOrUpdateLink(lt, newLinkInfo);
 
-        // Check if reverse link exists. 
-        // If it doesn't exist and if the forward link was seen 
-        // first seen within a small interval, send probe on the 
+        // Check if reverse link exists.
+        // If it doesn't exist and if the forward link was seen
+        // first seen within a small interval, send probe on the
         // reverse link.
 
         newLinkInfo = links.get(lt);
@@ -927,8 +932,8 @@
 
     protected Command handlePacketIn(long sw, OFPacketIn pi,
                                      FloodlightContext cntx) {
-        Ethernet eth = 
-                IFloodlightProviderService.bcStore.get(cntx, 
+        Ethernet eth =
+                IFloodlightProviderService.bcStore.get(cntx,
                                                        IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
 
         if(eth.getEtherType() == Ethernet.TYPE_BSN) {
@@ -999,8 +1004,8 @@
                 newInfo.setFirstSeenTime(oldInfo.getFirstSeenTime());
 
             if (log.isTraceEnabled()) {
-                log.trace("addOrUpdateLink: {} {}", 
-                          lt, 
+                log.trace("addOrUpdateLink: {} {}",
+                          lt,
                           (newInfo.getMulticastValidTime()!=null) ? "multicast" : "unicast");
             }
 
@@ -1033,7 +1038,7 @@
                 // Add to portNOFLinks if the unicast valid time is null
                 if (newInfo.getUnicastValidTime() == null)
                     addLinkToBroadcastDomain(lt);
-                
+
                 // ONOS: Distinguish added event separately from updated event
                 updateOperation = UpdateOperation.LINK_ADDED;
                 linkChanged = true;
@@ -1119,6 +1124,7 @@
         return linkChanged;
     }
 
+    @Override
     public Map<Long, Set<Link>> getSwitchLinks() {
         return this.switchLinks;
     }
@@ -1198,7 +1204,7 @@
 
         // ONOS: If we do not control this switch, then we should not process its port status messages
         if (!registryService.hasControl(iofSwitch.getId())) return Command.CONTINUE;
-        
+
         if (log.isTraceEnabled()) {
             log.trace("handlePortStatus: Switch {} port #{} reason {}; " +
                     "config is {} state is {}",
@@ -1225,7 +1231,7 @@
                 LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, UpdateOperation.PORT_DOWN));
                 controller.publishUpdate(update);
                 linkDeleted = true;
-                } 
+                }
             else if (ps.getReason() ==
                     (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
                 // If ps is a port modification and the port state has changed
@@ -1237,7 +1243,7 @@
                         assert(linkInfo != null);
                         Integer updatedSrcPortState = null;
                         Integer updatedDstPortState = null;
-                        if (lt.getSrc() == npt.getNodeId() && 
+                        if (lt.getSrc() == npt.getNodeId() &&
                                 lt.getSrcPort() == npt.getPortId() &&
                                 (linkInfo.getSrcPortState() !=
                                 ps.getDesc().getState())) {
@@ -1264,7 +1270,7 @@
                                                      getLinkType(lt, linkInfo),
                                                      operation));
                             controller.publishUpdate(update);
-                            
+
                             linkInfoChanged = true;
                         }
                     }
@@ -1378,9 +1384,9 @@
             lock.writeLock().unlock();
         }
     }
-    
+
     /**
-     * We don't react the port changed notifications here. we listen for 
+     * We don't react the port changed notifications here. we listen for
      * OFPortStatus messages directly. Might consider using this notifier
      * instead
      */
@@ -1389,7 +1395,7 @@
         // no-op
     }
 
-    /** 
+    /**
      * Delete links incident on a given switch port.
      * @param npt
      * @param reason
@@ -1409,7 +1415,7 @@
         }
     }
 
-    /** 
+    /**
      * Iterates through the list of links and deletes if the
      * last discovery message reception time exceeds timeout values.
      */
@@ -1430,7 +1436,7 @@
 
                 // Timeout the unicast and multicast LLDP valid times
                 // independently.
-                if ((info.getUnicastValidTime() != null) && 
+                if ((info.getUnicastValidTime() != null) &&
                         (info.getUnicastValidTime() + (this.LINK_TIMEOUT * 1000) < curTime)){
                     info.setUnicastValidTime(null);
 
@@ -1440,7 +1446,7 @@
                     // the link would be deleted, which would trigger updateClusters().
                     linkChanged = true;
                 }
-                if ((info.getMulticastValidTime()!= null) && 
+                if ((info.getMulticastValidTime()!= null) &&
                         (info.getMulticastValidTime()+ (this.LINK_TIMEOUT * 1000) < curTime)) {
                     info.setMulticastValidTime(null);
                     // if uTime is not null, then link will remain as openflow
@@ -1451,7 +1457,7 @@
                 }
                 // Add to the erase list only if the unicast
                 // time is null.
-                if (info.getUnicastValidTime() == null && 
+                if (info.getUnicastValidTime() == null &&
                         info.getMulticastValidTime() == null){
                     eraseList.add(entry.getKey());
                 } else if (linkChanged) {
@@ -1510,11 +1516,11 @@
         srcNpt = new NodePortTuple(lt.getSrc(), lt.getSrcPort());
         dstNpt = new NodePortTuple(lt.getDst(), lt.getDstPort());
 
-        if (!portBroadcastDomainLinks.containsKey(lt.getSrc()))
+        if (!portBroadcastDomainLinks.containsKey(srcNpt))
             portBroadcastDomainLinks.put(srcNpt, new HashSet<Link>());
         portBroadcastDomainLinks.get(srcNpt).add(lt);
 
-        if (!portBroadcastDomainLinks.containsKey(lt.getDst()))
+        if (!portBroadcastDomainLinks.containsKey(dstNpt))
             portBroadcastDomainLinks.put(dstNpt, new HashSet<Link>());
         portBroadcastDomainLinks.get(dstNpt).add(lt);
     }
@@ -1575,7 +1581,7 @@
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
                 new ArrayList<Class<? extends IFloodlightService>>();
         l.add(ILinkDiscoveryService.class);
         //l.add(ITopologyService.class);
@@ -1586,7 +1592,7 @@
     public Map<Class<? extends IFloodlightService>, IFloodlightService>
     getServiceImpls() {
         Map<Class<? extends IFloodlightService>,
-        IFloodlightService> m = 
+        IFloodlightService> m =
         new HashMap<Class<? extends IFloodlightService>,
         IFloodlightService>();
         // We are the class that implements the service
@@ -1596,7 +1602,7 @@
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
                 new ArrayList<Class<? extends IFloodlightService>>();
         l.add(IFloodlightProviderService.class);
         l.add(IThreadPoolService.class);
@@ -1679,7 +1685,7 @@
                     log.error("Exception in LLDP send timer.", e);
                 } finally {
                     if (!shuttingDown) {
-                    	// Always reschedule link discovery if we're not 
+                    	// Always reschedule link discovery if we're not
                     	// shutting down (no chance of SLAVE role now)
                         log.trace("Rescheduling discovery task");
                         discoveryTask.reschedule(DISCOVERY_TASK_INTERVAL,
@@ -1729,7 +1735,7 @@
         if ((sw.getChannel() != null) &&
                 (SocketAddress.class.isInstance(
                                                 sw.getChannel().getRemoteAddress()))) {
-            evTopoSwitch.ipv4Addr = 
+            evTopoSwitch.ipv4Addr =
                     IPv4.toIPv4Address(((InetSocketAddress)(sw.getChannel().
                             getRemoteAddress())).getAddress().getAddress());
             evTopoSwitch.l4Port   =
@@ -1787,10 +1793,12 @@
         evTopoCluster = evHistTopologyCluster.put(evTopoCluster, action);
     }
 
+    @Override
     public boolean isAutoPortFastFeature() {
         return autoPortFastFeature;
     }
 
+    @Override
     public void setAutoPortFastFeature(boolean autoPortFastFeature) {
         this.autoPortFastFeature = autoPortFastFeature;
     }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index 1674cf0..9fca86a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -1,6 +1,5 @@
 package net.onrc.onos.ofcontroller.topology;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.Map;
@@ -12,7 +11,6 @@
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
 import net.onrc.onos.ofcontroller.core.ISwitchStorage.SwitchState;
 
-import org.apache.commons.lang.StringUtils;
 import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -195,7 +193,10 @@
  */
 public class Topology {
     private final static Logger log = LoggerFactory.getLogger(Topology.class);
-
+    
+    // flag to use optimized readFromDatabase() method.
+    private static final boolean enableOptimizedRead = false;
+    
     private Map<Long, Node> nodesMap;	// The dpid->Node mapping
 
     /**
@@ -392,289 +393,173 @@
      * @param dbHandler the Graph Database handler to use.
      */
     public void readFromDatabase(DBOperation dbHandler) {
-		//
-		// Fetch the relevant info from the Switch and Port vertices
-		// from the Titan Graph.
-		//
-    	nodesMap.clear();
+    	if (enableOptimizedRead) {
+    		readFromDatabaseBodyOptimized(dbHandler);
+    	} else {
+    		readFromDatabaseBody(dbHandler);
+    	}
 
-        // Load all switches into Map
-        Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
-        for (ISwitchObject switchObj : switches) {
-        	// Ignore inactive ports
-            if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
-            	continue;
-            }
-            Vertex nodeVertex = switchObj.asVertex();
-            //
-            // The Switch info
-            //
-            String nodeDpid = nodeVertex.getProperty("dpid").toString();
-            long nodeId = HexString.toLong(nodeDpid);
-            addNode(nodeId);
-        }
-
-        //
-        // Get All Ports
-        //
-        Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
-        for (IPortObject myPortObj : ports) {
-            Vertex myPortVertex = myPortObj.asVertex();
-
-            // Ignore inactive ports
-            if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
-            	continue;
-            }
-
-            short myPort = 0;
-            String idStr = myPortObj.getPortId();
-            String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
-            if (splitter.length != 2) {
-            	log.error("Invalid port_id : {}", idStr);
-            	continue;
-            }
-            String myDpid = splitter[0];
-            myPort = Short.parseShort(splitter[1]);
-            long myId = HexString.toLong(myDpid);
-            Node me = nodesMap.get(myId);
-
-            if (me == null) {
-                // cannot proceed ports and switches are out of sync
-                //TODO: Restart the whole read
-                continue;
-            }
-
-            if (me.getPort(myPort) == null) {
-            	me.addPort(myPort);
-            } else if (me.getLink(myPort) != null) {
-                // Link already added..probably by neighbor
-                continue;
-            }
-
-            //
-            // The neighbor Port info
-            //
-            for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
-//            	log.debug("state : {}", neighborPortVertex.getProperty("state"));
-//            	log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
-                // Ignore inactive ports
-                if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
-                	continue;
-                }
-                int neighborPort = 0;
-                idStr = neighborPortVertex.getProperty("port_id").toString();
-                splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
-                if (splitter.length != 2) {
-                	log.error("Invalid port_id : {}", idStr);
-                	continue;
-                }
-                String neighborDpid = splitter[0];
-                neighborPort = Short.parseShort(splitter[1]);
-                long neighborId = HexString.toLong(neighborDpid);
-                Node neighbor = nodesMap.get(neighborId);
-//                log.debug("dpid {},{}  port {}", neighborDpid, neighborId, neighborPort);
-                if (neighbor == null) {
-                	continue;
-                }
-                me.addLink(myPort, neighbor, neighborPort);
-            }
-        }
-        dbHandler.commit();
     }
 
-
-    // Only for debug use
-    List<Long> logGetSw = new ArrayList<Long>(100);
-    List<Long> logGetPt = new ArrayList<Long>(100);
-    List<Long> logAddSw = new ArrayList<Long>(100);
-    List<Long> logAddPt = new ArrayList<Long>(100);
-    List<Long> logAddLk = new ArrayList<Long>(100);
-    List<Long> logCommit = new ArrayList<Long>(100);
-    List<Integer> logGetVertices = new ArrayList<Integer>(100);
-    List<Integer> logGetProperty = new ArrayList<Integer>(100);
-       public void readFromDatabaseBreakdown(DBOperation dbHandler) {
-    	int getVerticesCount = 0;
-    	int getPropertyCount = 0;
-    	int getVCount_sw = 0;
-    	int getVCount_pt = 0;
-    	int getVCount_lk = 0;
-    	int getPCount_sw = 0;
-    	int getPCount_pt = 0;
-    	int getPCount_lk = 0;
-
-		//
-		// Fetch the relevant info from the Switch and Port vertices
-		// from the Titan Graph.
-		//
+    private void readFromDatabaseBody(DBOperation dbHandler) {
+    	//
+    	// Fetch the relevant info from the Switch and Port vertices
+    	// from the Titan Graph.
+    	//
 
     	nodesMap.clear();
-    	long t1 = System.nanoTime();
+    	Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
+    	for (ISwitchObject switchObj : activeSwitches) {
+    	    Vertex nodeVertex = switchObj.asVertex();
+    	    //
+    	    // The Switch info
+    	    //
+    	    String nodeDpid = nodeVertex.getProperty("dpid").toString();
+    	    long nodeId = HexString.toLong(nodeDpid);
+    	    Node me = nodesMap.get(nodeId);
+    	    if (me == null)
+    		me = addNode(nodeId);
 
-        // Load all switches into Map
-        Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+    	    //
+    	    // The local Port info
+    	    //
+    	    for (Vertex myPortVertex : nodeVertex.getVertices(Direction.OUT, "on")) {
+    		// Ignore inactive ports
+    		if (! myPortVertex.getProperty("state").toString().equals("ACTIVE"))
+    		    continue;
 
-        long t2 = System.nanoTime();
+    		int myPort = 0;
+    		Object obj = myPortVertex.getProperty("number");
+    		if (obj instanceof Short) {
+    		    myPort = (Short)obj;
+    		} else if (obj instanceof Integer) {
+    		    myPort = (Integer)obj;
+    		}
+    		me.addPort(myPort);
 
-        long t_addSw = 0;
-        for (ISwitchObject switchObj : switches) {
-            long t3 = System.nanoTime();
-            long t4;
+    		for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+    		    // Ignore inactive ports
+    		    if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+    		    	continue;
+    		    }
 
-        	// Ignore inactive ports
-            ++getPropertyCount;
-            ++getPCount_sw;
-            if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
-                t4 = System.nanoTime();
-                t_addSw += t4 - t3;
-            	continue;
-            }
-            Vertex nodeVertex = switchObj.asVertex();
-            //
-            // The Switch info
-            //
-            ++getPropertyCount;
-            ++getPCount_sw;
-            String nodeDpid = nodeVertex.getProperty("dpid").toString();
-            long nodeId = HexString.toLong(nodeDpid);
-            addNode(nodeId);
-            t4 = System.nanoTime();
-            t_addSw += t4 - t3;
-        }
+    		    int neighborPort = 0;
+    		    obj = neighborPortVertex.getProperty("number");
+    		    if (obj instanceof Short) {
+    			neighborPort = (Short)obj;
+    		    } else if (obj instanceof Integer) {
+    			neighborPort = (Integer)obj;
+    		    }
+    		    //
+    		    // The neighbor Switch info
+    		    //
+    		    for (Vertex neighborVertex : neighborPortVertex.getVertices(Direction.IN, "on")) {
+    			// Ignore inactive switches
+    			String state = neighborVertex.getProperty("state").toString();
+    			if (! state.equals(SwitchState.ACTIVE.toString()))
+    			    continue;
 
-        long t5 = System.nanoTime();
-        //
-        // Get All Ports
-        //
-        Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
-
-        long t6 = System.nanoTime();
-        long t_addPort = 0;
-        long t_addLink = 0;
-
-        for (IPortObject myPortObj : ports) {
-            long t7 = System.nanoTime();
-            long t8;
-            Vertex myPortVertex = myPortObj.asVertex();
-
-            // Ignore inactive ports
-            ++getPropertyCount;
-            ++getPCount_pt;
-            if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
-                t8 = System.nanoTime();
-                t_addPort += t8 - t7;
-            	continue;
-            }
-
-            short myPort = 0;
-            ++getPropertyCount;
-            ++getPCount_pt;
-            String idStr = myPortObj.getPortId();
-            String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
-            if (splitter.length != 2) {
-            	log.error("Invalid port_id : {}", idStr);
-                t8 = System.nanoTime();
-                t_addPort += t8 - t7;
-            	continue;
-            }
-            String myDpid = splitter[0];
-            myPort = Short.parseShort(splitter[1]);
-            long myId = HexString.toLong(myDpid);
-            Node me = nodesMap.get(myId);
-
-            if (me == null) {
-                // cannot proceed ports and switches are out of sync
-                //TODO: Restart the whole read
-                t8 = System.nanoTime();
-                t_addPort += t8 - t7;
-                continue;
-            }
-
-            if (me.getPort(myPort) == null) {
-            	me.addPort(myPort);
-            } else if (me.getLink(myPort) != null) {
-                // Link already added..probably by neighbor
-                t8 = System.nanoTime();
-                t_addPort += t8 - t7;
-                continue;
-            }
-            t8 = System.nanoTime();
-            t_addPort += t8 - t7;
-
-            //
-            // The neighbor Port info
-            //
-            ++getVerticesCount;
-            ++getVCount_pt;
-            for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
-//            	log.debug("state : {}", neighborPortVertex.getProperty("state"));
-//            	log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
-
-                long t9 = System.nanoTime();
-                long t10;
-
-                // Ignore inactive ports
-                ++getPropertyCount;
-                ++getPCount_lk;
-                if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
-                    t10 = System.nanoTime();
-                    t_addLink += t10 - t9;
-                	continue;
-                }
-                int neighborPort = 0;
-                ++getPropertyCount;
-                ++getPCount_lk;
-                idStr = neighborPortVertex.getProperty("port_id").toString();
-                splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
-                if (splitter.length != 2) {
-                	log.error("Invalid port_id : {}", idStr);
-                    t10 = System.nanoTime();
-                    t_addLink += t10 - t9;
-                	continue;
-                }
-                String neighborDpid = splitter[0];
-                neighborPort = Short.parseShort(splitter[1]);
-                long neighborId = HexString.toLong(neighborDpid);
-                Node neighbor = nodesMap.get(neighborId);
-//                log.debug("dpid {},{}  port {}", neighborDpid, neighborId, neighborPort);
-                if (neighbor == null) {
-                    t10 = System.nanoTime();
-                    t_addLink += t10 - t9;
-                	continue;
-                }
-                me.addLink(myPort, neighbor, neighborPort);
-
-                t10 = System.nanoTime();
-                t_addLink += t10 - t9;
-            }
-        }
-        long t11 = System.nanoTime();
-        dbHandler.commit();
-        long t12 = System.nanoTime();
-
-        logGetSw.add((t2-t1)/1000);
-        logGetPt.add((t6-t5)/1000);
-        logAddSw.add(t_addSw/1000);
-        logAddPt.add(t_addPort/1000);
-        logAddLk.add(t_addLink/1000);
-        logCommit.add((t12-t11)/1000);
-        logGetVertices.add(getVerticesCount);
-        logGetProperty.add(getPropertyCount);
-        log.debug("getVertices[N({}),P({}),L({})] getProperty[N({}),P({}),L({})]",
-        		new Object[]{getVCount_sw,getVCount_pt,getVCount_lk,
-        		getPCount_sw,getPCount_pt,getPCount_lk});
+    			String neighborDpid = neighborVertex.getProperty("dpid").toString();
+    			long neighborId = HexString.toLong(neighborDpid);
+    			Node neighbor = nodesMap.get(neighborId);
+    			if (neighbor == null)
+    			    neighbor = addNode(neighborId);
+    			neighbor.addPort(neighborPort);
+    			me.addLink(myPort, neighbor, neighborPort);
+    		    }
+    		}
+    	    }
+    	}
+    	dbHandler.commit();
     }
 
-    public void printMeasuredLog() {
-    	log.debug("getsw: {}", StringUtils.join(logGetSw, ","));
-    	log.debug("getpt: {}", StringUtils.join(logGetPt, ","));
-    	log.debug("addsw: {}", StringUtils.join(logAddSw, ","));
-    	log.debug("addpt: {}", StringUtils.join(logAddPt, ","));
-    	log.debug("addlk: {}", StringUtils.join(logAddLk, ","));
-    	log.debug("commit: {}", StringUtils.join(logCommit, ","));
-    	log.debug("getvertices: {}", StringUtils.join(logGetVertices, ","));
-    	log.debug("getproperty: {}", StringUtils.join(logGetProperty, ","));
+    private void readFromDatabaseBodyOptimized(DBOperation dbHandler) {
+	    nodesMap.clear();
+		    
+		// Load all switches into Map
+		Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+		for (ISwitchObject switchObj : switches) {
+		        // Ignore inactive ports
+		    if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
+	            continue;
+		    }
+		    Vertex nodeVertex = switchObj.asVertex();
+		    //
+		    // The Switch info
+		    //
+		    String nodeDpid = nodeVertex.getProperty("dpid").toString();
+		    long nodeId = HexString.toLong(nodeDpid);
+		    addNode(nodeId);
+		}
+		
+		//
+		// Get All Ports
+		//
+		Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
+		for (IPortObject myPortObj : ports) {
+		    Vertex myPortVertex = myPortObj.asVertex();
+		    
+		    // Ignore inactive ports
+		    if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+	            continue;
+		    }
+		    
+		    short myPort = 0;
+		    String idStr = myPortObj.getPortId();
+		    String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+		    if (splitter.length != 2) {
+	            log.error("Invalid port_id : {}", idStr);
+	            continue;
+		    }
+		    String myDpid = splitter[0];
+		    myPort = Short.parseShort(splitter[1]);
+		    long myId = HexString.toLong(myDpid);
+		    Node me = nodesMap.get(myId);
+		    
+		    if (me == null) {
+		        // cannot proceed ports and switches are out of sync
+		        //TODO: Restart the whole read
+		        continue;
+		    }
+		    
+		    if (me.getPort((int)myPort) == null) {
+	            me.addPort((int)myPort);
+		    } else if (me.getLink((int)myPort) != null) {
+		        // Link already added..probably by neighbor
+		        continue;
+		    }
+		
+		    //
+		    // The neighbor Port info
+		    //
+		    for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+		        // Ignore inactive ports
+		        if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+	                continue;
+		        }
+		        int neighborPort = 0;
+		        idStr = neighborPortVertex.getProperty("port_id").toString();
+		        splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+		        if (splitter.length != 2) {
+	                log.error("Invalid port_id : {}", idStr);
+	                continue;
+		        }
+		        String neighborDpid = splitter[0];
+		        neighborPort = Short.parseShort(splitter[1]);
+		        long neighborId = HexString.toLong(neighborDpid);                                
+		        Node neighbor = nodesMap.get(neighborId);
+		        if (neighbor == null) {
+	                continue;
+		        }
+		        if (neighbor.getPort(neighborPort) == null) {
+		        	neighbor.addPort(neighborPort);
+		        }
+		        me.addLink(myPort, neighbor, neighborPort);
+		    }
+		}
+		dbHandler.commit();
     }
-
+    
     // Only for debug use
     @Override
     public String toString() {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
index cbc3224..a074d19 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
@@ -90,7 +90,8 @@
     /**
      * Shutdown the Topology Manager operation.
      */
-    public void finalize() {
+    @Override
+    protected void finalize() {
 	close();
     }
 
@@ -108,7 +109,7 @@
      */
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
             new ArrayList<Class<? extends IFloodlightService>>();
         l.add(ITopologyNetService.class);
         return l;
@@ -120,10 +121,10 @@
      * @return the collection of implemented services.
      */
     @Override
-    public Map<Class<? extends IFloodlightService>, IFloodlightService> 
+    public Map<Class<? extends IFloodlightService>, IFloodlightService>
 			       getServiceImpls() {
         Map<Class<? extends IFloodlightService>,
-	    IFloodlightService> m = 
+	    IFloodlightService> m =
             new HashMap<Class<? extends IFloodlightService>,
 	    IFloodlightService>();
         m.put(ITopologyNetService.class, this);
@@ -136,7 +137,7 @@
      * @return the collection of modules this module depends on.
      */
     @Override
-    public Collection<Class<? extends IFloodlightService>> 
+    public Collection<Class<? extends IFloodlightService>>
                                                     getModuleDependencies() {
 	Collection<Class<? extends IFloodlightService>> l =
 	    new ArrayList<Class<? extends IFloodlightService>>();
@@ -200,6 +201,7 @@
      *
      * @return the allocated topology handler.
      */
+    @Override
     public Topology newDatabaseTopology() {
 	Topology topology = new Topology();
 	topology.readFromDatabase(dbHandler);
@@ -216,6 +218,7 @@
      *
      * @param topology the topology to release.
      */
+    @Override
     public void dropTopology(Topology topology) {
 	topology = null;
     }
@@ -312,6 +315,7 @@
      * @return the data path with the computed shortest path if
      * found, otherwise null.
      */
+    @Override
     public DataPath getTopologyShortestPath(Topology topology,
 					    SwitchPort src, SwitchPort dest) {
 	return ShortestPath.getTopologyShortestPath(topology, src, dest);