Merge branch 'ONOS-ONRC2014-Measurements' of github.com:OPENNETWORKINGLAB/ONOS into RAMCloud-merge

Conflicts:
	src/main/java/net/onrc/onos/graph/GraphDBOperation.java
	src/main/java/net/onrc/onos/graph/IDBOperation.java
	src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
	src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
	src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
	src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
	src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
	src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 001fb3c..cf9b67d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -15,11 +15,15 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
 import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.graph.GraphDBManager;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
 import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.EventEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction;
@@ -31,6 +35,8 @@
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Pair;
+import net.onrc.onos.ofcontroller.util.Port;
 import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
 
 import com.esotericsoftware.kryo2.Kryo;
@@ -46,17 +52,16 @@
  * - Detect FlowPaths impacted by Topology change.
  * - Recompute impacted FlowPath using cached Topology.
  */
-class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+class FlowEventHandler extends Thread implements IFlowEventHandlerService,
+						 IOFSwitchListener {
+
+    private boolean enableOnrc2014MeasurementsFlows = true;
+    private boolean enableOnrc2014MeasurementsTopology = true;
+
     /** The logger. */
     private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
 
-    // Flag to refresh Topology object periodically
-    private final static boolean refreshTopology = false;
-    // Refresh delay(ms)
-    private final static long refreshTopologyDelay = 5000;
-    // Refresh interval(ms)
-    private final static long refreshTopologyInterval = 2000;
-    private Timer refreshTopologyTimer;
+    private DBOperation dbHandler;
 
     private FlowManager flowManager;		// The Flow Manager to use
     private IDatagridService datagridService;	// The Datagrid Service to use
@@ -74,6 +79,12 @@
 	new LinkedList<EventEntry<FlowPath>>();
     private List<EventEntry<FlowEntry>> flowEntryEvents =
 	new LinkedList<EventEntry<FlowEntry>>();
+    private List<EventEntry<Pair<FlowId, Dpid>>> flowIdEvents =
+	new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
+    private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
+	new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
+    private List<EventEntry<Dpid>> switchDpidEvents =
+	new LinkedList<EventEntry<Dpid>>();
 
     // All internally computed Flow Paths
     private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -119,6 +130,8 @@
      * Startup processing.
      */
     private void startup() {
+	this.dbHandler = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
+
 	//
 	// Obtain the initial Topology state
 	//
@@ -148,32 +161,33 @@
 	    flowEntryEvents.add(eventEntry);
 	}
 
+	//
+	// Obtain the initial FlowId state
+	//
+	Collection<Pair<FlowId, Dpid>> flowIds =
+	    datagridService.getAllFlowIds();
+	for (Pair<FlowId, Dpid> pair : flowIds) {
+	    EventEntry<Pair<FlowId, Dpid>> eventEntry =
+		new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+	    flowIdEvents.add(eventEntry);
+	}
+
+	//
+	// Obtain the initial FlowEntryId state
+	//
+	Collection<Pair<FlowEntryId, Dpid>> flowEntryIds =
+	    datagridService.getAllFlowEntryIds();
+	for (Pair<FlowEntryId, Dpid> pair : flowEntryIds) {
+	    EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+		new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+	    flowEntryIdEvents.add(eventEntry);
+	}
+
 	// Process the initial events (if any)
 	synchronized (allFlowPaths) {
 	    processEvents();
 	}
 
-	if (refreshTopology) {
-		refreshTopologyTimer = new Timer();
-		refreshTopologyTimer.schedule(new TimerTask() {
-			@Override
-			public void run() {
-				PerfMon pm = PerfMon.getInstance();
-				log.debug("[BEFORE] {}", topology);
-				long begin, end;
-				synchronized(topology) {
-					begin = System.nanoTime();
-					pm.read_whole_topology_start();
-					topology.readFromDatabase(flowManager.dbHandlerInner);
-					pm.read_whole_topology_end();
-					end = System.nanoTime();
-				}
-				// FIXME level raised for measurement. Was debug
-				log.error("[AFTER] {}", topology);
-				log.error("refresh takes : {}[us]", (end - begin) / 1000.0);
-			}
-		}, refreshTopologyDelay, refreshTopologyInterval);
-	}
     }
 
     /**
@@ -199,12 +213,15 @@
 		//  - EventEntry<TopologyElement>
 		//  - EventEntry<FlowPath>
 		//  - EventEntry<FlowEntry>
+		//  - EventEntry<Pair<FlowId, Dpid>>
+		//  - EventEntry<Pair<FlowEntryId, Dpid>>
 		//
 		for (EventEntry<?> event : collection) {
 		    // Topology event
 		    if (event.eventData() instanceof TopologyElement) {
 			EventEntry<TopologyElement> topologyEventEntry =
 			    (EventEntry<TopologyElement>)event;
+			
 			topologyEvents.add(topologyEventEntry);
 			continue;
 		    }
@@ -224,6 +241,34 @@
 			flowEntryEvents.add(flowEntryEventEntry);
 			continue;
 		    }
+
+		    // FlowId event
+		    if (event.eventData() instanceof Pair) {
+			EventEntry<Pair<FlowId, Dpid>> flowIdEventEntry =
+			    (EventEntry<Pair<FlowId, Dpid>>)event;
+			flowIdEvents.add(flowIdEventEntry);
+			continue;
+		    }
+
+		    // Switch Dpid event
+		    if (event.eventData() instanceof Dpid) {
+			EventEntry<Dpid> switchDpidEventEntry =
+			    (EventEntry<Dpid>)event;
+			switchDpidEvents.add(switchDpidEventEntry);
+			continue;
+		    }
+
+		    // FlowEntryId event
+		    // TODO: Fix the code below if we need again to handle
+		    // the FlowEntryId events
+		    /*
+		    if (event.eventData() instanceof Pair) {
+			EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
+			    (EventEntry<Pair<FlowEntryId, Dpid>>)event;
+			flowEntryIdEvents.add(flowEntryIdEventEntry);
+			continue;
+		    }
+		    */
 		}
 		collection.clear();
 
@@ -236,13 +281,89 @@
 	    log.debug("Exception processing Network Events: ", exception);
 	}
     }
-
+    
     /**
      * Process the events (if any)
      */
     private void processEvents() {
 	Collection<FlowEntry> modifiedFlowEntries;
 
+	if (enableOnrc2014MeasurementsFlows) {
+
+	    if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
+		switchDpidEvents.isEmpty()) {
+		return;		// Nothing to do
+	    }
+
+	    Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+	    // Process the Switch Dpid events
+	    processSwitchDpidEvents();
+
+	    // Process the Flow ID events
+	    processFlowIdEvents(mySwitches);
+
+	    // Fetch the topology
+	    processTopologyEvents();
+
+	    // Recompute all affected Flow Paths and keep only the modified
+	    for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+		if (recomputeFlowPath(flowPath))
+		    modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+	    }
+
+	    // Assign the Flow Entry ID as needed
+	    for (FlowPath flowPath : modifiedFlowPaths.values()) {
+		for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		    if (! flowEntry.isValidFlowEntryId()) {
+			long id = flowManager.getNextFlowEntryId();
+			flowEntry.setFlowEntryId(new FlowEntryId(id));
+		    }
+		}
+	    }
+
+	    //
+	    // Push the modified state to the database
+	    //
+	    for (FlowPath flowPath : modifiedFlowPaths.values()) {
+		//
+		// Delete the Flow Path from the Network Map
+		//
+		if (flowPath.flowPathUserState() ==
+		    FlowPathUserState.FP_USER_DELETE) {
+		    log.debug("Deleting Flow Path From Database: {}", flowPath);
+		    // TODO: For now the deleting of a Flow Path is blocking
+		    ParallelFlowDatabaseOperation.deleteFlow(dbHandler,
+							     flowPath.flowId());
+		    // Send the notifications for the deleted Flow Entries
+		    for (FlowEntry flowEntry : flowPath.flowEntries()) {
+			datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+		    }
+
+		    continue;
+		}
+
+		log.debug("Pushing Flow Path To Database: {}", flowPath);
+		//
+		// Write the Flow Path to the Network Map
+		//
+		ParallelFlowDatabaseOperation.addFlow(dbHandler, flowPath,
+						      datagridService);
+	    }
+
+	    // Cleanup
+	    topologyEvents.clear();
+	    flowIdEvents.clear();
+	    switchDpidEvents.clear();
+	    //
+	    // NOTE: Keep a cache with my Flow Paths
+	    // allFlowPaths.clear();
+	    shouldRecomputeFlowPaths.clear();
+	    modifiedFlowPaths.clear();
+
+	    return;
+	}
+
 	if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
 	    flowEntryEvents.isEmpty()) {
 	    return;		// Nothing to do
@@ -379,6 +500,246 @@
     }
 
     /**
+     * Fix a flow fetched from the database.
+     *
+     * @param flowPath the Flow to fix.
+     */
+    private void fixFlowFromDatabase(FlowPath flowPath) {
+	//
+	// TODO: Bug workaround / fix :
+	// method FlowDatabaseOperation.extractFlowEntry() doesn't
+	// fetch the inPort and outPort, hence we assign them here.
+	//
+	// Assign the inPort and outPort for the Flow Entries
+	for (FlowEntry flowEntry : flowPath.flowEntries()) {
+	    // Set the inPort
+	    do {
+		if (flowEntry.inPort() != null)
+		    break;
+		if (flowEntry.flowEntryMatch() == null)
+		    break;
+		Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
+		flowEntry.setInPort(inPort);
+	    } while (false);
+
+	    // Set the outPort
+	    do {
+		if (flowEntry.outPort() != null)
+		    break;
+		for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+		    if (fa.actionOutput() != null) {
+			Port outPort = new Port(fa.actionOutput().port().value());
+			flowEntry.setOutPort(outPort);
+			break;
+		    }
+		}
+	    } while (false);
+	}
+    }
+
+    /**
+     * Process the Switch Dpid events.
+     */
+    private void processSwitchDpidEvents() {
+	Map<Long, Dpid> addedSwitches = new HashMap<Long, Dpid>();
+	Map<Long, Dpid> removedSwitches = new HashMap<Long, Dpid>();
+
+	//
+	// Process all Switch Dpid events and update the appropriate state
+	//
+	for (EventEntry<Dpid> eventEntry : switchDpidEvents) {
+	    Dpid dpid = eventEntry.eventData();
+			
+	    log.debug("SwitchDpid Event: {} {}", eventEntry.eventType(), dpid);
+
+	    // Compute the final set of added and removed switches
+	    switch (eventEntry.eventType()) {
+	    case ENTRY_ADD:
+		addedSwitches.put(dpid.value(), dpid);
+		removedSwitches.remove(dpid.value());
+		break;
+	    case ENTRY_REMOVE:
+		addedSwitches.remove(dpid.value());
+		removedSwitches.put(dpid.value(), dpid);
+		break;
+	    }
+	}
+
+	//
+	// Remove the Flows from the local cache if the removed
+	// switch is the Source Switch.
+	//
+	// TODO: This search can be expensive for a large number of flows
+	// and should be optmized.
+	//
+	List<FlowId> deleteFlowIds = new LinkedList<FlowId>();
+	for (Dpid switchDpid : removedSwitches.values()) {
+	    for (FlowPath flowPath : allFlowPaths.values()) {
+		Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+		if (srcDpid.value() == switchDpid.value())
+		    deleteFlowIds.add(flowPath.flowId());
+	    }
+	}
+	//
+	// Remove the Flows from the local cache
+	//
+	for (FlowId flowId : deleteFlowIds)
+	    allFlowPaths.remove(flowId.value());
+
+	// Get the Flows for the added switches
+	Collection<FlowPath> flowPaths =
+	    ParallelFlowDatabaseOperation.getFlowsForSwitches(dbHandler,
+							      addedSwitches.values());
+	for (FlowPath flowPath : flowPaths) {
+	    allFlowPaths.put(flowPath.flowId().value(), flowPath);
+	}
+    }
+
+    /**
+     * Process the Flow ID events.
+     *
+     * @param mySwitches the collection of my switches.
+     */
+    private void processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+	List<FlowId> shouldFetchMyFlowIds = new LinkedList<FlowId>();
+
+	//
+	// Process all Flow Id events and update the appropriate state
+	//
+	for (EventEntry<Pair<FlowId, Dpid>> eventEntry : flowIdEvents) {
+	    Pair<FlowId, Dpid> pair = eventEntry.eventData();
+	    FlowId flowId = pair.first;
+	    Dpid dpid = pair.second;
+
+	    log.debug("Flow ID Event: {} {} {}", eventEntry.eventType(),
+		      flowId, dpid);
+
+	    //
+	    // Ignore Flows if the Source Switch is not controlled by this
+	    // instance.
+	    //
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
+
+	    switch (eventEntry.eventType()) {
+	    case ENTRY_ADD: {
+		//
+		// Add a new Flow Path
+		//
+		if (allFlowPaths.get(flowId.value()) != null) {
+		    //
+		    // TODO: What to do if the Flow Path already exists?
+		    // Fow now, we just re-add it.
+		    //
+		}
+		shouldFetchMyFlowIds.add(flowId);
+
+		break;
+	    }
+
+	    case ENTRY_REMOVE: {
+		//
+		// Remove an existing Flow Path.
+		//
+		// Find the Flow Path, and mark the Flow and its Flow Entries
+		// for deletion.
+		//
+		FlowPath existingFlowPath =
+		    allFlowPaths.get(flowId.value());
+		if (existingFlowPath == null)
+		    continue;		// Nothing to do
+
+		existingFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_DELETE);
+		for (FlowEntry flowEntry : existingFlowPath.flowEntries()) {
+		    flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+		    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+		}
+
+		// Remove the Flow Path from the internal state
+		Long key = existingFlowPath.flowId().value();
+		allFlowPaths.remove(key);
+		shouldRecomputeFlowPaths.remove(key);
+		modifiedFlowPaths.put(key, existingFlowPath);
+
+		break;
+	    }
+	    }
+	}
+
+	// Get my Flows
+	Collection<FlowPath> myFlows =
+	    ParallelFlowDatabaseOperation.getFlows(dbHandler,
+						   shouldFetchMyFlowIds);
+
+	for (FlowPath flowPath : myFlows) {
+	    fixFlowFromDatabase(flowPath);
+
+	    switch (flowPath.flowPathType()) {
+	    case FP_TYPE_SHORTEST_PATH:
+		//
+		// Reset the Data Path, in case it was set already, because
+		// we are going to recompute it anyway.
+		//
+		flowPath.flowEntries().clear();
+		shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+					     flowPath);
+		break;
+	    case FP_TYPE_EXPLICIT_PATH:
+		//
+		// Mark all Flow Entries for installation in the switches.
+		//
+		for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		    flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+		}
+		modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+		break;
+	    case FP_TYPE_UNKNOWN:
+		log.error("FlowPath event with unknown type");
+		break;
+	    }
+	    allFlowPaths.put(flowPath.flowId().value(), flowPath);
+	}
+    }
+
+    /**
+     * Process the Flow Entry ID events.
+     *
+     * @param mySwitches the collection of my switches.
+     * @return a collection of modified Flow Entries this instance needs
+     * to push to its own switches.
+     */
+    private Collection<FlowEntry> processFlowEntryIdEvents(Map<Long, IOFSwitch> mySwitches) {
+	List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+	//
+	// Process all Flow ID events and update the appropriate state
+	//
+	for (EventEntry<Pair<FlowEntryId, Dpid>> eventEntry : flowEntryIdEvents) {
+	    Pair<FlowEntryId, Dpid> pair = eventEntry.eventData();
+	    FlowEntryId flowEntryId = pair.first;
+	    Dpid dpid = pair.second;
+
+	    log.debug("Flow Entry ID Event: {} {} {}", eventEntry.eventType(),
+		      flowEntryId, dpid);
+
+	    if (mySwitches.get(dpid.value()) == null)
+		continue;
+
+	    // Fetch the Flow Entry
+	    FlowEntry flowEntry = FlowDatabaseOperation.getFlowEntry(dbHandler,
+								     flowEntryId);
+	    if (flowEntry == null) {
+		log.debug("Flow Entry ID {} : Flow Entry not found!",
+			  flowEntryId);
+		continue;
+	    }
+	    modifiedFlowEntries.add(flowEntry);
+	}
+
+	return modifiedFlowEntries;
+    }
+
+    /**
      * Process the Flow Path events.
      */
     private void processFlowPathEvents() {
@@ -464,15 +825,34 @@
      * Process the Topology events.
      */
     private void processTopologyEvents() {
+	boolean isTopologyModified = false;
+
+	if (enableOnrc2014MeasurementsTopology) {
+	    if (topologyEvents.isEmpty())
+		return;
+
+	    // TODO: Code for debugging purpose only
+	    for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
+		TopologyElement topologyElement = eventEntry.eventData();
+		log.debug("Topology Event: {} {}", eventEntry.eventType(),
+			  topologyElement.toString());
+	    }
+
+	    log.debug("[BEFORE] {}", topology.toString());
+	    topology.readFromDatabase(dbHandler);
+	    log.debug("[AFTER] {}", topology.toString());
+	    shouldRecomputeFlowPaths.putAll(allFlowPaths);
+	    return;
+	}
+
 	//
 	// Process all Topology events and update the appropriate state
 	//
-	boolean isTopologyModified = false;
 	for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
 	    TopologyElement topologyElement = eventEntry.eventData();
-
+			
 	    log.debug("Topology Event: {} {}", eventEntry.eventType(),
-		      topologyElement);
+		      topologyElement.toString());
 
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD:
@@ -752,6 +1132,19 @@
     private boolean recomputeFlowPath(FlowPath flowPath) {
 	boolean hasChanged = false;
 
+	if (enableOnrc2014MeasurementsFlows) {
+	    // Cleanup the deleted Flow Entries from the earlier iteration
+	    flowPath.dataPath().removeDeletedFlowEntries();
+
+	    //
+	    // TODO: Fake it that the Flow Entries have been already pushed
+	    // into the switches, so we don't push them again.
+	    //
+	    for (FlowEntry flowEntry : flowPath.flowEntries()) {
+		flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+	    }
+	}
+
 	//
 	// Test whether the Flow Path needs to be recomputed
 	//
@@ -772,7 +1165,6 @@
 	newDataPath = TopologyManager.computeNetworkPath(topology,
 								  flowPath);
 	}
-
 	if (newDataPath == null) {
 	    // We need the DataPath to compare the paths
 	    newDataPath = new DataPath();
@@ -982,6 +1374,13 @@
      */
     @Override
     public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
+	if (enableOnrc2014MeasurementsFlows) {
+	    Collection entries = new ArrayList();
+	    entries.add(flowEntry);
+	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
+	    return;
+	}
+
 	EventEntry<FlowEntry> eventEntry =
 	    new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
 	networkEvents.add(eventEntry);
@@ -994,6 +1393,19 @@
      */
     @Override
     public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
+	if (enableOnrc2014MeasurementsFlows) {
+	    //
+	    // NOTE: Must update the state to DELETE, because
+	    // the notification contains the original state.
+	    //
+	    flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+
+	    Collection entries = new ArrayList();
+	    entries.add(flowEntry);
+	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
+	    return;
+	}
+
 	EventEntry<FlowEntry> eventEntry =
 	    new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_REMOVE, flowEntry);
 	networkEvents.add(eventEntry);
@@ -1006,6 +1418,13 @@
      */
     @Override
     public void notificationRecvFlowEntryUpdated(FlowEntry flowEntry) {
+	if (enableOnrc2014MeasurementsFlows) {
+	    Collection entries = new ArrayList();
+	    entries.add(flowEntry);
+	    flowManager.pushModifiedFlowEntriesToSwitches(entries);
+	    return;
+	}
+
 	// NOTE: The ADD and UPDATE events are processed in same way
 	EventEntry<FlowEntry> eventEntry =
 	    new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
@@ -1013,6 +1432,101 @@
     }
 
     /**
+     * Receive a notification that a FlowId is added.
+     *
+     * @param flowId the FlowId that is added.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    @Override
+    public void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is removed.
+     *
+     * @param flowId the FlowId that is removed.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    @Override
+    public void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowId is updated.
+     *
+     * @param flowId the FlowId that is updated.
+     * @param dpid the Source Switch Dpid for the corresponding Flow.
+     */
+    @Override
+    public void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid) {
+	Pair flowIdPair = new Pair(flowId, dpid);
+
+	// NOTE: The ADD and UPDATE events are processed in same way
+	EventEntry<Pair<FlowId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowEntryId is added.
+     *
+     * @param flowEntryId the FlowEntryId that is added.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    @Override
+    public void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId,
+						 Dpid dpid) {
+	Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+	EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowEntryId is removed.
+     *
+     * @param flowEntryId the FlowEntryId that is removed.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    @Override
+    public void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+						   Dpid dpid) {
+	Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+	EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowEntryIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a FlowEntryId is updated.
+     *
+     * @param flowEntryId the FlowEntryId that is updated.
+     * @param dpid the Switch Dpid for the corresponding Flow Entry.
+     */
+    @Override
+    public void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+						   Dpid dpid) {
+	Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+	// NOTE: The ADD and UPDATE events are processed in same way
+	EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+	    new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
      * Receive a notification that a Topology Element is added.
      *
      * @param topologyElement the Topology Element that is added.
@@ -1050,6 +1564,40 @@
     }
 
     /**
+     * Receive a notification that a switch is added to this instance.
+     *
+     * @param sw the switch that is added.
+     */
+    @Override
+    public void addedSwitch(IOFSwitch sw) {
+	Dpid dpid = new Dpid(sw.getId());
+	EventEntry<Dpid> eventEntry =
+	    new EventEntry<Dpid>(EventEntry.Type.ENTRY_ADD, dpid);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that a switch is removed from this instance.
+     *
+     * @param sw the switch that is removed.
+     */
+    @Override
+    public void removedSwitch(IOFSwitch sw) {
+	Dpid dpid = new Dpid(sw.getId());
+	EventEntry<Dpid> eventEntry =
+	    new EventEntry<Dpid>(EventEntry.Type.ENTRY_REMOVE, dpid);
+	networkEvents.add(eventEntry);
+    }
+
+    /**
+     * Receive a notification that the ports on a switch have changed.
+     */
+    @Override
+    public void switchPortChanged(Long switchId) {
+	// Nothing to do
+    }
+    
+    /**
      * Get a sorted copy of all Flow Paths.
      *
      * @return a sorted copy of all Flow Paths.