Merge branch 'ONOS-ONRC2014-Measurements' of github.com:OPENNETWORKINGLAB/ONOS into RAMCloud-merge
Conflicts:
src/main/java/net/onrc/onos/graph/GraphDBOperation.java
src/main/java/net/onrc/onos/graph/IDBOperation.java
src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 001fb3c..cf9b67d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -15,11 +15,15 @@
import java.util.concurrent.LinkedBlockingQueue;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.graph.GraphDBManager;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.topology.TopologyManager;
import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.EventEntry;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
@@ -31,6 +35,8 @@
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Pair;
+import net.onrc.onos.ofcontroller.util.Port;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import com.esotericsoftware.kryo2.Kryo;
@@ -46,17 +52,16 @@
* - Detect FlowPaths impacted by Topology change.
* - Recompute impacted FlowPath using cached Topology.
*/
-class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+class FlowEventHandler extends Thread implements IFlowEventHandlerService,
+ IOFSwitchListener {
+
+ private boolean enableOnrc2014MeasurementsFlows = true;
+ private boolean enableOnrc2014MeasurementsTopology = true;
+
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
- // Flag to refresh Topology object periodically
- private final static boolean refreshTopology = false;
- // Refresh delay(ms)
- private final static long refreshTopologyDelay = 5000;
- // Refresh interval(ms)
- private final static long refreshTopologyInterval = 2000;
- private Timer refreshTopologyTimer;
+ private DBOperation dbHandler;
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
@@ -74,6 +79,12 @@
new LinkedList<EventEntry<FlowPath>>();
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ private List<EventEntry<Pair<FlowId, Dpid>>> flowIdEvents =
+ new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
+ private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
+ new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
+ private List<EventEntry<Dpid>> switchDpidEvents =
+ new LinkedList<EventEntry<Dpid>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -119,6 +130,8 @@
* Startup processing.
*/
private void startup() {
+ this.dbHandler = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
+
//
// Obtain the initial Topology state
//
@@ -148,32 +161,33 @@
flowEntryEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowId state
+ //
+ Collection<Pair<FlowId, Dpid>> flowIds =
+ datagridService.getAllFlowIds();
+ for (Pair<FlowId, Dpid> pair : flowIds) {
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+ flowIdEvents.add(eventEntry);
+ }
+
+ //
+ // Obtain the initial FlowEntryId state
+ //
+ Collection<Pair<FlowEntryId, Dpid>> flowEntryIds =
+ datagridService.getAllFlowEntryIds();
+ for (Pair<FlowEntryId, Dpid> pair : flowEntryIds) {
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+ flowEntryIdEvents.add(eventEntry);
+ }
+
// Process the initial events (if any)
synchronized (allFlowPaths) {
processEvents();
}
- if (refreshTopology) {
- refreshTopologyTimer = new Timer();
- refreshTopologyTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- PerfMon pm = PerfMon.getInstance();
- log.debug("[BEFORE] {}", topology);
- long begin, end;
- synchronized(topology) {
- begin = System.nanoTime();
- pm.read_whole_topology_start();
- topology.readFromDatabase(flowManager.dbHandlerInner);
- pm.read_whole_topology_end();
- end = System.nanoTime();
- }
- // FIXME level raised for measurement. Was debug
- log.error("[AFTER] {}", topology);
- log.error("refresh takes : {}[us]", (end - begin) / 1000.0);
- }
- }, refreshTopologyDelay, refreshTopologyInterval);
- }
}
/**
@@ -199,12 +213,15 @@
// - EventEntry<TopologyElement>
// - EventEntry<FlowPath>
// - EventEntry<FlowEntry>
+ // - EventEntry<Pair<FlowId, Dpid>>
+ // - EventEntry<Pair<FlowEntryId, Dpid>>
//
for (EventEntry<?> event : collection) {
// Topology event
if (event.eventData() instanceof TopologyElement) {
EventEntry<TopologyElement> topologyEventEntry =
(EventEntry<TopologyElement>)event;
+
topologyEvents.add(topologyEventEntry);
continue;
}
@@ -224,6 +241,34 @@
flowEntryEvents.add(flowEntryEventEntry);
continue;
}
+
+ // FlowId event
+ if (event.eventData() instanceof Pair) {
+ EventEntry<Pair<FlowId, Dpid>> flowIdEventEntry =
+ (EventEntry<Pair<FlowId, Dpid>>)event;
+ flowIdEvents.add(flowIdEventEntry);
+ continue;
+ }
+
+ // Switch Dpid event
+ if (event.eventData() instanceof Dpid) {
+ EventEntry<Dpid> switchDpidEventEntry =
+ (EventEntry<Dpid>)event;
+ switchDpidEvents.add(switchDpidEventEntry);
+ continue;
+ }
+
+ // FlowEntryId event
+ // TODO: Fix the code below if we need again to handle
+ // the FlowEntryId events
+ /*
+ if (event.eventData() instanceof Pair) {
+ EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
+ (EventEntry<Pair<FlowEntryId, Dpid>>)event;
+ flowEntryIdEvents.add(flowEntryIdEventEntry);
+ continue;
+ }
+ */
}
collection.clear();
@@ -236,13 +281,89 @@
log.debug("Exception processing Network Events: ", exception);
}
}
-
+
/**
* Process the events (if any)
*/
private void processEvents() {
Collection<FlowEntry> modifiedFlowEntries;
+ if (enableOnrc2014MeasurementsFlows) {
+
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
+ switchDpidEvents.isEmpty()) {
+ return; // Nothing to do
+ }
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ // Process the Switch Dpid events
+ processSwitchDpidEvents();
+
+ // Process the Flow ID events
+ processFlowIdEvents(mySwitches);
+
+ // Fetch the topology
+ processTopologyEvents();
+
+ // Recompute all affected Flow Paths and keep only the modified
+ for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ // Assign the Flow Entry ID as needed
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+
+ //
+ // Push the modified state to the database
+ //
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ //
+ // Delete the Flow Path from the Network Map
+ //
+ if (flowPath.flowPathUserState() ==
+ FlowPathUserState.FP_USER_DELETE) {
+ log.debug("Deleting Flow Path From Database: {}", flowPath);
+ // TODO: For now the deleting of a Flow Path is blocking
+ ParallelFlowDatabaseOperation.deleteFlow(dbHandler,
+ flowPath.flowId());
+ // Send the notifications for the deleted Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ }
+
+ continue;
+ }
+
+ log.debug("Pushing Flow Path To Database: {}", flowPath);
+ //
+ // Write the Flow Path to the Network Map
+ //
+ ParallelFlowDatabaseOperation.addFlow(dbHandler, flowPath,
+ datagridService);
+ }
+
+ // Cleanup
+ topologyEvents.clear();
+ flowIdEvents.clear();
+ switchDpidEvents.clear();
+ //
+ // NOTE: Keep a cache with my Flow Paths
+ // allFlowPaths.clear();
+ shouldRecomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+
+ return;
+ }
+
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
return; // Nothing to do
@@ -379,6 +500,246 @@
}
/**
+ * Fix a flow fetched from the database.
+ *
+ * @param flowPath the Flow to fix.
+ */
+ private void fixFlowFromDatabase(FlowPath flowPath) {
+ //
+ // TODO: Bug workaround / fix :
+ // method FlowDatabaseOperation.extractFlowEntry() doesn't
+ // fetch the inPort and outPort, hence we assign them here.
+ //
+ // Assign the inPort and outPort for the Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ // Set the inPort
+ do {
+ if (flowEntry.inPort() != null)
+ break;
+ if (flowEntry.flowEntryMatch() == null)
+ break;
+ Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
+ flowEntry.setInPort(inPort);
+ } while (false);
+
+ // Set the outPort
+ do {
+ if (flowEntry.outPort() != null)
+ break;
+ for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+ if (fa.actionOutput() != null) {
+ Port outPort = new Port(fa.actionOutput().port().value());
+ flowEntry.setOutPort(outPort);
+ break;
+ }
+ }
+ } while (false);
+ }
+ }
+
+ /**
+ * Process the Switch Dpid events.
+ */
+ private void processSwitchDpidEvents() {
+ Map<Long, Dpid> addedSwitches = new HashMap<Long, Dpid>();
+ Map<Long, Dpid> removedSwitches = new HashMap<Long, Dpid>();
+
+ //
+ // Process all Switch Dpid events and update the appropriate state
+ //
+ for (EventEntry<Dpid> eventEntry : switchDpidEvents) {
+ Dpid dpid = eventEntry.eventData();
+
+ log.debug("SwitchDpid Event: {} {}", eventEntry.eventType(), dpid);
+
+ // Compute the final set of added and removed switches
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ addedSwitches.put(dpid.value(), dpid);
+ removedSwitches.remove(dpid.value());
+ break;
+ case ENTRY_REMOVE:
+ addedSwitches.remove(dpid.value());
+ removedSwitches.put(dpid.value(), dpid);
+ break;
+ }
+ }
+
+ //
+ // Remove the Flows from the local cache if the removed
+ // switch is the Source Switch.
+ //
+ // TODO: This search can be expensive for a large number of flows
+ // and should be optmized.
+ //
+ List<FlowId> deleteFlowIds = new LinkedList<FlowId>();
+ for (Dpid switchDpid : removedSwitches.values()) {
+ for (FlowPath flowPath : allFlowPaths.values()) {
+ Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+ if (srcDpid.value() == switchDpid.value())
+ deleteFlowIds.add(flowPath.flowId());
+ }
+ }
+ //
+ // Remove the Flows from the local cache
+ //
+ for (FlowId flowId : deleteFlowIds)
+ allFlowPaths.remove(flowId.value());
+
+ // Get the Flows for the added switches
+ Collection<FlowPath> flowPaths =
+ ParallelFlowDatabaseOperation.getFlowsForSwitches(dbHandler,
+ addedSwitches.values());
+ for (FlowPath flowPath : flowPaths) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
+ * Process the Flow ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ */
+ private void processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowId> shouldFetchMyFlowIds = new LinkedList<FlowId>();
+
+ //
+ // Process all Flow Id events and update the appropriate state
+ //
+ for (EventEntry<Pair<FlowId, Dpid>> eventEntry : flowIdEvents) {
+ Pair<FlowId, Dpid> pair = eventEntry.eventData();
+ FlowId flowId = pair.first;
+ Dpid dpid = pair.second;
+
+ log.debug("Flow ID Event: {} {} {}", eventEntry.eventType(),
+ flowId, dpid);
+
+ //
+ // Ignore Flows if the Source Switch is not controlled by this
+ // instance.
+ //
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD: {
+ //
+ // Add a new Flow Path
+ //
+ if (allFlowPaths.get(flowId.value()) != null) {
+ //
+ // TODO: What to do if the Flow Path already exists?
+ // Fow now, we just re-add it.
+ //
+ }
+ shouldFetchMyFlowIds.add(flowId);
+
+ break;
+ }
+
+ case ENTRY_REMOVE: {
+ //
+ // Remove an existing Flow Path.
+ //
+ // Find the Flow Path, and mark the Flow and its Flow Entries
+ // for deletion.
+ //
+ FlowPath existingFlowPath =
+ allFlowPaths.get(flowId.value());
+ if (existingFlowPath == null)
+ continue; // Nothing to do
+
+ existingFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_DELETE);
+ for (FlowEntry flowEntry : existingFlowPath.flowEntries()) {
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+
+ // Remove the Flow Path from the internal state
+ Long key = existingFlowPath.flowId().value();
+ allFlowPaths.remove(key);
+ shouldRecomputeFlowPaths.remove(key);
+ modifiedFlowPaths.put(key, existingFlowPath);
+
+ break;
+ }
+ }
+ }
+
+ // Get my Flows
+ Collection<FlowPath> myFlows =
+ ParallelFlowDatabaseOperation.getFlows(dbHandler,
+ shouldFetchMyFlowIds);
+
+ for (FlowPath flowPath : myFlows) {
+ fixFlowFromDatabase(flowPath);
+
+ switch (flowPath.flowPathType()) {
+ case FP_TYPE_SHORTEST_PATH:
+ //
+ // Reset the Data Path, in case it was set already, because
+ // we are going to recompute it anyway.
+ //
+ flowPath.flowEntries().clear();
+ shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+ flowPath);
+ break;
+ case FP_TYPE_EXPLICIT_PATH:
+ //
+ // Mark all Flow Entries for installation in the switches.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ break;
+ case FP_TYPE_UNKNOWN:
+ log.error("FlowPath event with unknown type");
+ break;
+ }
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
+ * Process the Flow Entry ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ * @return a collection of modified Flow Entries this instance needs
+ * to push to its own switches.
+ */
+ private Collection<FlowEntry> processFlowEntryIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+ //
+ // Process all Flow ID events and update the appropriate state
+ //
+ for (EventEntry<Pair<FlowEntryId, Dpid>> eventEntry : flowEntryIdEvents) {
+ Pair<FlowEntryId, Dpid> pair = eventEntry.eventData();
+ FlowEntryId flowEntryId = pair.first;
+ Dpid dpid = pair.second;
+
+ log.debug("Flow Entry ID Event: {} {} {}", eventEntry.eventType(),
+ flowEntryId, dpid);
+
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ // Fetch the Flow Entry
+ FlowEntry flowEntry = FlowDatabaseOperation.getFlowEntry(dbHandler,
+ flowEntryId);
+ if (flowEntry == null) {
+ log.debug("Flow Entry ID {} : Flow Entry not found!",
+ flowEntryId);
+ continue;
+ }
+ modifiedFlowEntries.add(flowEntry);
+ }
+
+ return modifiedFlowEntries;
+ }
+
+ /**
* Process the Flow Path events.
*/
private void processFlowPathEvents() {
@@ -464,15 +825,34 @@
* Process the Topology events.
*/
private void processTopologyEvents() {
+ boolean isTopologyModified = false;
+
+ if (enableOnrc2014MeasurementsTopology) {
+ if (topologyEvents.isEmpty())
+ return;
+
+ // TODO: Code for debugging purpose only
+ for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
+ TopologyElement topologyElement = eventEntry.eventData();
+ log.debug("Topology Event: {} {}", eventEntry.eventType(),
+ topologyElement.toString());
+ }
+
+ log.debug("[BEFORE] {}", topology.toString());
+ topology.readFromDatabase(dbHandler);
+ log.debug("[AFTER] {}", topology.toString());
+ shouldRecomputeFlowPaths.putAll(allFlowPaths);
+ return;
+ }
+
//
// Process all Topology events and update the appropriate state
//
- boolean isTopologyModified = false;
for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
TopologyElement topologyElement = eventEntry.eventData();
-
+
log.debug("Topology Event: {} {}", eventEntry.eventType(),
- topologyElement);
+ topologyElement.toString());
switch (eventEntry.eventType()) {
case ENTRY_ADD:
@@ -752,6 +1132,19 @@
private boolean recomputeFlowPath(FlowPath flowPath) {
boolean hasChanged = false;
+ if (enableOnrc2014MeasurementsFlows) {
+ // Cleanup the deleted Flow Entries from the earlier iteration
+ flowPath.dataPath().removeDeletedFlowEntries();
+
+ //
+ // TODO: Fake it that the Flow Entries have been already pushed
+ // into the switches, so we don't push them again.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+ }
+ }
+
//
// Test whether the Flow Path needs to be recomputed
//
@@ -772,7 +1165,6 @@
newDataPath = TopologyManager.computeNetworkPath(topology,
flowPath);
}
-
if (newDataPath == null) {
// We need the DataPath to compare the paths
newDataPath = new DataPath();
@@ -982,6 +1374,13 @@
*/
@Override
public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+ return;
+ }
+
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
networkEvents.add(eventEntry);
@@ -994,6 +1393,19 @@
*/
@Override
public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+ //
+ // NOTE: Must update the state to DELETE, because
+ // the notification contains the original state.
+ //
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+ return;
+ }
+
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_REMOVE, flowEntry);
networkEvents.add(eventEntry);
@@ -1006,6 +1418,13 @@
*/
@Override
public void notificationRecvFlowEntryUpdated(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+ return;
+ }
+
// NOTE: The ADD and UPDATE events are processed in same way
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
@@ -1013,6 +1432,101 @@
}
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
@@ -1050,6 +1564,40 @@
}
/**
+ * Receive a notification that a switch is added to this instance.
+ *
+ * @param sw the switch that is added.
+ */
+ @Override
+ public void addedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_ADD, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a switch is removed from this instance.
+ *
+ * @param sw the switch that is removed.
+ */
+ @Override
+ public void removedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_REMOVE, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that the ports on a switch have changed.
+ */
+ @Override
+ public void switchPortChanged(Long switchId) {
+ // Nothing to do
+ }
+
+ /**
* Get a sorted copy of all Flow Paths.
*
* @return a sorted copy of all Flow Paths.