Merge branch 'master' into ONOS-ONRC2014-Measurements
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 33091b9..effbe81 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -73,6 +73,12 @@
     private MapFlowEntryListener mapFlowEntryListener = null;
     private String mapFlowEntryListenerId = null;
 
+    // State related to the Flow ID map
+    protected static final String mapFlowIdName = "mapFlowId";
+    private IMap<Long, byte[]> mapFlowId = null;
+    private MapFlowIdListener mapFlowIdListener = null;
+    private String mapFlowIdListenerId = null;
+
     // State related to the Network Topology map
     protected static final String mapTopologyName = "mapTopology";
     private IMap<String, byte[]> mapTopology = null;
@@ -238,6 +244,78 @@
     }
 
     /**
+     * Class for receiving notifications for FlowId state.
+     *
+     * The datagrid map is:
+     *  - Key : FlowId (Long)
+     *  - Value : Serialized FlowId (byte[])
+     */
+    class MapFlowIdListener implements EntryListener<Long, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryAdded(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryRemoved(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryUpdated(EntryEvent<Long, byte[]> event) {
+	    byte[] valueBytes = (byte[])event.getValue();
+
+	    //
+	    // Decode the value and deliver the notification
+	    //
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    kryoFactory.deleteKryo(kryo);
+	    flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	public void entryEvicted(EntryEvent<Long, byte[]> event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+
+    /**
      * Class for receiving notifications for Network Topology state.
      *
      * The datagrid map is:
@@ -521,6 +599,11 @@
 	mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
 	mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
 
+	// Initialize the FlowId-related map state
+	mapFlowIdListener = new MapFlowIdListener();
+	mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+	mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
 	// Initialize the Topology-related map state
 	mapTopologyListener = new MapTopologyListener();
 	mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -548,6 +631,11 @@
 	mapFlowEntry = null;
 	mapFlowEntryListener = null;
 
+	// Clear the FlowId-related map state
+	mapFlowId.removeEntryListener(mapFlowIdListenerId);
+	mapFlowId = null;
+	mapFlowIdListener = null;
+
 	// Clear the Topology-related map state
 	mapTopology.removeEntryListener(mapTopologyListenerId);
 	mapTopology = null;
@@ -805,6 +893,101 @@
     }
 
     /**
+     * Get all Flow IDs that are currently in the datagrid.
+     *
+     * @return all Flow IDs that are currently in the datagrid.
+     */
+    @Override
+    public Collection<FlowId> getAllFlowIds() {
+	Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
+
+	//
+	// Get all current entries
+	//
+	Collection<byte[]> values = mapFlowId.values();
+	Kryo kryo = kryoFactory.newKryo();
+	for (byte[] valueBytes : values) {
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    FlowId flowId = kryo.readObject(input, FlowId.class);
+	    allFlowIds.add(flowId);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allFlowIds;
+    }
+
+    /**
+     * Send a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    @Override
+    public void notificationSendFlowIdAdded(FlowId flowId) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, flowId);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized FlowId (byte[])
+	//
+	mapFlowId.putAsync(flowId.value(), valueBytes);
+    }
+
+    /**
+     * Send a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    @Override
+    public void notificationSendFlowIdRemoved(FlowId flowId) {
+	//
+	// Remove the entry:
+	//  - Key : FlowId (Long)
+	//  - Value : Serialized FlowId (byte[])
+	//
+	mapFlowId.removeAsync(flowId.value());
+    }
+
+    /**
+     * Send a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    @Override
+    public void notificationSendFlowIdUpdated(FlowId flowId) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	notificationSendFlowIdAdded(flowId);
+    }
+
+    /**
+     * Send a notification that all Flow IDs are removed.
+     */
+    @Override
+    public void notificationSendAllFlowIdsRemoved() {
+	//
+	// Remove all entries
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// mapFlowId.clear();
+	Set<Long> keySet = mapFlowId.keySet();
+	for (Long key : keySet) {
+	    mapFlowId.removeAsync(key);
+	}
+    }
+
+    /**
      * Get all Topology Elements that are currently in the datagrid.
      *
      * @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..d4e7b00 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -134,6 +134,39 @@
     void notificationSendAllFlowEntriesRemoved();
 
     /**
+     * Get all Flow IDs that are currently in the datagrid.
+     *
+     * @return all Flow IDs that are currently in the datagrid.
+     */
+    Collection<FlowId> getAllFlowIds();
+
+    /**
+     * Send a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    void notificationSendFlowIdAdded(FlowId flowId);
+
+    /**
+     * Send a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    void notificationSendFlowIdRemoved(FlowId flowId);
+
+    /**
+     * Send a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    void notificationSendFlowIdUpdated(FlowId flowId);
+
+    /**
+     * Send a notification that all Flow IDs are removed.
+     */
+    void notificationSendAllFlowIdsRemoved();
+
+    /**
      * Get all Topology Elements that are currently in the datagrid.
      *
      * @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index e075bad..d4a563b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -5,7 +5,9 @@
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
+import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.util.MACAddress;
 
 import net.onrc.onos.graph.GraphDBOperation;
@@ -464,6 +466,45 @@
     }
 
     /**
+     * Get the source switch DPID of a previously added flow.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowId the Flow ID of the flow to get.
+     * @return the source switch DPID if found, otherwise null.
+     */
+    static Dpid getFlowSourceDpid(GraphDBOperation dbHandler, FlowId flowId) {
+	IFlowPath flowObj = null;
+	try {
+	    flowObj = dbHandler.searchFlowPath(flowId);
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getFlowSourceDpid FlowId:{} failed", flowId);
+	    return null;
+	}
+	if (flowObj == null) {
+	    dbHandler.commit();
+	    return null;		// Flow not found
+	}
+
+	//
+	// Extract the Flow Source DPID
+	//
+	String srcSwitchStr = flowObj.getSrcSwitch();
+	if (srcSwitchStr == null) {
+	    // TODO: A work-around, becauuse of some bogus database objects
+	    dbHandler.commit();
+	    return null;
+	}
+
+	Dpid dpid = new Dpid(srcSwitchStr);
+
+	dbHandler.commit();
+
+	return dpid;
+    }
+
+    /**
      * Get all installed flows by all installers.
      *
      * @param dbHandler the Graph Database handler to use.
@@ -501,6 +542,60 @@
     }
 
     /**
+     * Get all installed flows whose Source Switch is controlled by this
+     * instance.
+     *
+     * @param dbHandler the Graph Database handler to use.
+     * @param mySwitches the collection of the switches controlled by this
+     * instance.
+     * @return the Flow Paths if found, otherwise null.
+     */
+    static ArrayList<FlowPath> getAllMyFlows(GraphDBOperation dbHandler,
+					     Map<Long, IOFSwitch> mySwitches) {
+	Iterable<IFlowPath> flowPathsObj = null;
+	ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+	try {
+	    flowPathsObj = dbHandler.getAllFlowPaths();
+	} catch (Exception e) {
+	    // TODO: handle exceptions
+	    dbHandler.rollback();
+	    log.error(":getAllMyFlowPaths failed");
+	    return flowPaths;
+	}
+	if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+	    dbHandler.commit();
+	    return flowPaths;	// No Flows found
+	}
+
+	for (IFlowPath flowObj : flowPathsObj) {
+	    //
+	    // Extract the Source Switch DPID and ignore if the switch
+	    // is not controlled by this instance.
+	    //
+	    String srcSwitchStr = flowObj.getSrcSwitch();
+	    if (srcSwitchStr == null) {
+		// TODO: A work-around, becauuse of some bogus database objects
+		continue;
+	    }
+	    Dpid dpid = new Dpid(srcSwitchStr);
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
+
+	    //
+	    // Extract the Flow state
+	    //
+	    FlowPath flowPath = extractFlowPath(flowObj);
+	    if (flowPath != null)
+		flowPaths.add(flowPath);
+	}
+
+	dbHandler.commit();
+
+	return flowPaths;
+    }
+
+    /**
      * Extract Flow Path State from a Titan Database Object @ref IFlowPath.
      *
      * @param flowObj the object to extract the Flow Path State from.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 8b1f7c0..9d37a63 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -14,6 +14,7 @@
 
 import net.floodlightcontroller.core.IOFSwitch;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.GraphDBOperation;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
@@ -44,9 +45,14 @@
  * - Recompute impacted FlowPath using cached Topology.
  */
 class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+
+    private boolean enableOnrc2014MeasurementsFlows = false;
+    private boolean enableOnrc2014MeasurementsTopology = false;
+
     /** The logger. */
     private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
-
+    
+    private GraphDBOperation dbHandler;
     private FlowManager flowManager;		// The Flow Manager to use
     private IDatagridService datagridService;	// The Datagrid Service to use
     private Topology topology;			// The network topology
@@ -63,6 +69,8 @@
 	new LinkedList<EventEntry<FlowPath>>();
     private List<EventEntry<FlowEntry>> flowEntryEvents =
 	new LinkedList<EventEntry<FlowEntry>>();
+    private List<EventEntry<FlowId>> flowIdEvents =
+	new LinkedList<EventEntry<FlowId>>();
 
     // All internally computed Flow Paths
     private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -108,6 +116,8 @@
      * Startup processing.
      */
     private void startup() {
+	this.dbHandler = new GraphDBOperation("");
+
 	//
 	// Obtain the initial Topology state
 	//
@@ -137,6 +147,16 @@
 	    flowEntryEvents.add(eventEntry);
 	}
 
+	//
+	// Obtain the initial FlowId state
+	//
+	Collection<FlowId> flowIds = datagridService.getAllFlowIds();
+	for (FlowId flowId : flowIds) {
+	    EventEntry<FlowId> eventEntry =
+		new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	    flowIdEvents.add(eventEntry);
+	}
+
 	// Process the initial events (if any)
 	synchronized (allFlowPaths) {
 	    processEvents();
@@ -166,12 +186,14 @@
 		//  - EventEntry<TopologyElement>
 		//  - EventEntry<FlowPath>
 		//  - EventEntry<FlowEntry>
+		//  - EventEntry<FlowId>
 		//
 		for (EventEntry<?> event : collection) {
 		    // Topology event
 		    if (event.eventData() instanceof TopologyElement) {
 			EventEntry<TopologyElement> topologyEventEntry =
 			    (EventEntry<TopologyElement>)event;
+			
 			topologyEvents.add(topologyEventEntry);
 			continue;
 		    }
@@ -191,6 +213,14 @@
 			flowEntryEvents.add(flowEntryEventEntry);
 			continue;
 		    }
+
+		    // FlowId event
+		    if (event.eventData() instanceof FlowId) {
+			EventEntry<FlowId> flowIdEventEntry =
+			    (EventEntry<FlowId>)event;
+			flowIdEvents.add(flowIdEventEntry);
+			continue;
+		    }
 		}
 		collection.clear();
 
@@ -203,13 +233,63 @@
 	    log.debug("Exception processing Network Events: ", exception);
 	}
     }
-
+    
     /**
      * Process the events (if any)
      */
     private void processEvents() {
 	Collection<FlowEntry> modifiedFlowEntries;
 
+	if (enableOnrc2014MeasurementsFlows) {
+
+	    if (topologyEvents.isEmpty() && flowIdEvents.isEmpty()) {
+		return;		// Nothing to do
+	    }
+
+	    Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+	    // Fetch and prepare my flows
+	    prepareMyFlows(mySwitches);
+
+	    // Fetch the topology
+	    processTopologyEvents();
+
+	    // Recompute all affected Flow Paths and keep only the modified
+	    for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+		if (recomputeFlowPath(flowPath))
+		    modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+	    }
+
+	    // Assign the Flow Entry ID as needed
+	    for (FlowPath flowPath : modifiedFlowPaths.values()) {
+		for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		    if (! flowEntry.isValidFlowEntryId()) {
+			long id = flowManager.getNextFlowEntryId();
+			flowEntry.setFlowEntryId(new FlowEntryId(id));
+		    }
+		}
+	    }
+
+	    // Extract my modified Flow Entries
+	    modifiedFlowEntries = processFlowIdEvents(mySwitches);
+
+	    //
+	    // Push the modified state to the Flow Manager
+	    //
+	    flowManager.pushModifiedFlowState(modifiedFlowPaths.values(),
+					      modifiedFlowEntries);
+
+	    // Cleanup
+	    topologyEvents.clear();
+	    flowIdEvents.clear();
+	    //
+	    allFlowPaths.clear();
+	    shouldRecomputeFlowPaths.clear();
+	    modifiedFlowPaths.clear();
+
+	    return;
+	}
+
 	if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
 	    flowEntryEvents.isEmpty()) {
 	    return;		// Nothing to do
@@ -346,6 +426,65 @@
     }
 
     /**
+     * Prepare my flows.
+     *
+     * @param mySwitches the collection of my switches.
+     */
+    private void prepareMyFlows(Map<Long, IOFSwitch> mySwitches) {
+	ArrayList<FlowPath> myFlows = flowManager.getAllMyFlows(mySwitches);
+	for (FlowPath flowPath : myFlows) {
+	    allFlowPaths.put(flowPath.flowId().value(), flowPath);
+	}
+    }
+
+    /**
+     * Process the Flow ID events.
+     *
+     * @param mySwitches the collection of my switches.
+     * @return a collection of modified Flow Entries this instance needs
+     * to push to its own switches.
+     */
+    private Collection<FlowEntry> processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+	List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+	//
+	// Process all Flow ID events and update the appropriate state
+	//
+	for (EventEntry<FlowId> eventEntry : flowIdEvents) {
+	    FlowId flowId = eventEntry.eventData();
+
+	    log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
+
+	    //
+	    // Lookup the Flow ID in the Flows that were read from the
+	    // database in in a previous step. If not found, read from
+	    // the database.
+	    //
+	    FlowPath flowPath = allFlowPaths.get(flowId.value());
+	    if (flowPath == null)
+		flowPath = flowManager.getFlow(flowId);
+	    if (flowPath == null) {
+		log.debug("Flow ID {} : Flow not found!", flowId);
+		continue;
+	    }
+
+	    // Collect only my flow entries that are not updated.
+	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		if (flowEntry.flowEntrySwitchState() !=
+		    FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+		    continue;
+		}
+		IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+		if (mySwitch == null)
+		    continue;
+		modifiedFlowEntries.add(flowEntry);
+	    }
+	}
+
+	return modifiedFlowEntries;
+    }
+
+    /**
      * Process the Flow Path events.
      */
     private void processFlowPathEvents() {
@@ -431,15 +570,25 @@
      * Process the Topology events.
      */
     private void processTopologyEvents() {
+	if (enableOnrc2014MeasurementsTopology) {
+	    if (topologyEvents.isEmpty())
+		return;
+	    log.debug("[BEFORE] {}", topology.toString());
+	    topology.readFromDatabase(dbHandler);
+	    log.debug("[AFTER] {}", topology.toString());
+	    shouldRecomputeFlowPaths.putAll(allFlowPaths);
+	    return;
+	}
+
 	//
 	// Process all Topology events and update the appropriate state
 	//
 	boolean isTopologyModified = false;
 	for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
 	    TopologyElement topologyElement = eventEntry.eventData();
-
+			
 	    log.debug("Topology Event: {} {}", eventEntry.eventType(),
-		      topologyElement);
+		      topologyElement.toString());
 
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD:
@@ -972,6 +1121,43 @@
     }
 
     /**
+     * Receive a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    @Override
+    public void notificationRecvFlowIdAdded(FlowId flowId) {
+	EventEntry<FlowId> eventEntry =
+	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    @Override
+    public void notificationRecvFlowIdRemoved(FlowId flowId) {
+	EventEntry<FlowId> eventEntry =
+	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_REMOVE, flowId);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    @Override
+    public void notificationRecvFlowIdUpdated(FlowId flowId) {
+	// NOTE: The ADD and UPDATE events are processed in same way
+	EventEntry<FlowId> eventEntry =
+	    new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
      * Receive a notification that a Topology Element is added.
      *
      * @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index dd98f4e..121c72a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -48,6 +48,9 @@
  * Flow Manager class for handling the network flows.
  */
 public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
+
+    private boolean enableOnrc2014MeasurementsFlows = false;
+
     protected GraphDBOperation dbHandlerApi;
     protected GraphDBOperation dbHandlerInner;
 
@@ -265,7 +268,12 @@
 	}
 
 	if (FlowDatabaseOperation.addFlow(dbHandlerApi, flowPath)) {
-	    datagridService.notificationSendFlowAdded(flowPath);
+	    if (enableOnrc2014MeasurementsFlows) {
+		datagridService.notificationSendFlowIdAdded(flowPath.flowId());
+	    } else {
+		datagridService.notificationSendFlowAdded(flowPath);
+	    }
+
 	    return flowPath.flowId();
 	}
 	return null;
@@ -279,7 +287,11 @@
     @Override
     public boolean deleteAllFlows() {
 	if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
-	    datagridService.notificationSendAllFlowsRemoved();
+	    if (enableOnrc2014MeasurementsFlows) {
+		datagridService.notificationSendAllFlowIdsRemoved();
+	    } else {
+		datagridService.notificationSendAllFlowsRemoved();
+	    }
 	    return true;
 	}
 	return false;
@@ -294,7 +306,11 @@
     @Override
     public boolean deleteFlow(FlowId flowId) {
 	if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
-	    datagridService.notificationSendFlowRemoved(flowId);
+	    if (enableOnrc2014MeasurementsFlows) {
+		datagridService.notificationSendFlowIdRemoved(flowId);
+	    } else {
+		datagridService.notificationSendFlowRemoved(flowId);
+	    }
 	    return true;
 	}
 	return false;
@@ -312,6 +328,16 @@
     }
 
     /**
+     * Get the source switch DPID of a previously added flow.
+     *
+     * @param flowId the Flow ID of the flow to get.
+     * @return the source switch DPID if found, otherwise null.
+     */
+    public Dpid getFlowSourceDpid(FlowId flowId) {
+	return FlowDatabaseOperation.getFlowSourceDpid(dbHandlerApi, flowId);
+    }
+
+    /**
      * Get all installed flows by all installers.
      *
      * @return the Flow Paths if found, otherwise null.
@@ -322,6 +348,18 @@
     }
 
     /**
+     * Get all installed flows whose Source Switch is controlled by this
+     * instance.
+     *
+     * @param mySwitches the collection of the switches controlled by this
+     * instance.
+     * @return the Flow Paths if found, otherwise null.
+     */
+    public ArrayList<FlowPath> getAllMyFlows(Map<Long, IOFSwitch> mySwitches) {
+	return FlowDatabaseOperation.getAllMyFlows(dbHandlerApi, mySwitches);
+    }
+
+    /**
      * Get summary of all installed flows by all installers in a given range.
      *
      * @param flowId the Flow ID of the first flow in the flow range to get.
@@ -413,6 +451,9 @@
     public void flowEntriesPushedToSwitch(
 		Collection<Pair<IOFSwitch, FlowEntry>> entries) {
 
+	if (enableOnrc2014MeasurementsFlows)
+	    return;
+
 	//
 	// Process all entries
 	//
@@ -483,7 +524,9 @@
 	//
 	pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
 	pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
-	cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+	if (! enableOnrc2014MeasurementsFlows) {
+	    cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+	}
     }
 
     /**
@@ -714,6 +757,11 @@
 		    }
 		} while (retry);
 
+		if (enableOnrc2014MeasurementsFlows) {
+		    // Send the notification
+		    datagridService.notificationSendFlowIdRemoved(flowPath.flowId());
+		}
+
 		continue;
 	    }
 
@@ -730,10 +778,12 @@
 		    allValid = false;
 		    break;
 		}
-		if (flowEntry.flowEntrySwitchState() !=
-		    FlowEntrySwitchState.FE_SWITCH_UPDATED) {
-		    allValid = false;
-		    break;
+		if (! enableOnrc2014MeasurementsFlows) {
+		    if (flowEntry.flowEntrySwitchState() !=
+			FlowEntrySwitchState.FE_SWITCH_UPDATED) {
+			allValid = false;
+			break;
+		    }
 		}
 	    }
 	    if (! allValid)
@@ -759,6 +809,11 @@
 		    log.error("Exception writing Flow Path to Network MAP: ", e);
 		}
 	    } while (retry);
+
+	    if (enableOnrc2014MeasurementsFlows) {
+		// Send the notification
+		datagridService.notificationSendFlowIdAdded(flowPath.flowId());
+	    }
 	}
     }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..62edf70 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -2,6 +2,7 @@
 
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
 /**
@@ -51,6 +52,27 @@
     void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
 
     /**
+     * Receive a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     */
+    void notificationRecvFlowIdAdded(FlowId flowId);
+
+    /**
+     * Receive a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     */
+    void notificationRecvFlowIdRemoved(FlowId flowId);
+
+    /**
+     * Receive a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     */
+    void notificationRecvFlowIdUpdated(FlowId flowId);
+
+    /**
      * Receive a notification that a Topology Element is added.
      *
      * @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index fc75591..92da9ba 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -387,6 +387,8 @@
 	// Fetch the relevant info from the Switch and Port vertices
 	// from the Titan Graph.
 	//
+    	nodesMap = new TreeMap<Long,Node>();
+	
 	Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
 	for (ISwitchObject switchObj : activeSwitches) {
 	    Vertex nodeVertex = switchObj.asVertex();
@@ -451,4 +453,74 @@
 	}
 	dbHandler.commit();
     }
+    
+    // TODO Merge into loops in readFromDatabase() can reduce execution time.
+    /**
+     * Check given two topology are identical or not.
+     * @param topo1
+     * @param topo2
+     * @return true if identical
+     */
+    private boolean compareTopology(Map<Long,Node> topo1, Map<Long,Node> topo2) {
+    	if (topo1.size() != topo2.size()) {
+    		return false;
+    	}
+    	
+    	for (Map.Entry<Long,Node> nodeEntry : topo1.entrySet()) {
+    		Long dpid = nodeEntry.getKey();
+    		if (! topo2.containsKey(dpid)) {
+    			return false;
+    		}
+    		
+    		Node n1 = nodeEntry.getValue();
+    		Node n2 = topo2.get(dpid);
+    		
+    		// check port identity
+    		if (n1.ports().size() != n2.ports().size()) {
+    			return false;
+    		}
+    		for (Integer port : n1.ports().keySet()) {
+    			if (! n2.ports().containsKey(port)) {
+    				return false;
+    			}
+    		}
+    		
+    		// check link identity
+    		if (n1.links.size() != n2.links.size()) {
+    			return false;
+    		}
+    		for (Map.Entry<Integer, Node.Link> linkEntry : n1.links.entrySet()) {
+    			Integer p1 = linkEntry.getKey();
+    			Node.Link l1 = linkEntry.getValue();
+    			
+    			if (! n2.links.containsKey(p1)) {
+    				return false;
+    			}
+    			Node.Link l2 = n2.links.get(p1);
+    			
+    		   	// Supposition: Link's "me" and "neighbor" is properly set.
+    			if (l1.myPort != l2.myPort ||
+    				l1.neighborPort != l2.neighborPort) {
+    				return false;
+    			}
+    		}
+    	}
+    	return true;
+    }
+    
+    // Only for debug use
+    @Override
+    public String toString() {
+    	long numNodes = nodesMap.size();
+    	long numLinks = 0;
+    	for (Map.Entry<Long, Node> entry : nodesMap.entrySet()) {
+    		Node n = entry.getValue();
+    		for (Map.Entry<Integer, Node.Link> linkEntry : n.links.entrySet()) {
+    			if (n.nodeId > linkEntry.getValue().neighbor.nodeId) {
+    				++numLinks;
+    			}
+    		}
+    	}
+    	return "Topology has " + numNodes + " Nodes and " + numLinks + " Links.";
+    }
 }