Subscribe the FlowEventHandler to Switch Added/Removed notifications,
and update the local cache of flows as appropriate.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index fa650f6..5c095d5 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -13,6 +13,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
+
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.topology.Topology;
@@ -47,14 +49,15 @@
* - Detect FlowPaths impacted by Topology change.
* - Recompute impacted FlowPath using cached Topology.
*/
-class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+class FlowEventHandler extends Thread implements IFlowEventHandlerService,
+ IOFSwitchListener {
private boolean enableOnrc2014MeasurementsFlows = true;
private boolean enableOnrc2014MeasurementsTopology = true;
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
-
+
private GraphDBOperation dbHandler;
private FlowManager flowManager; // The Flow Manager to use
@@ -77,6 +80,8 @@
new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
+ private List<EventEntry<Dpid>> switchDpidEvents =
+ new LinkedList<EventEntry<Dpid>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -240,6 +245,15 @@
flowIdEvents.add(flowIdEventEntry);
continue;
}
+
+ // Switch Dpid event
+ if (event.eventData() instanceof Dpid) {
+ EventEntry<Dpid> switchDpidEventEntry =
+ (EventEntry<Dpid>)event;
+ switchDpidEvents.add(switchDpidEventEntry);
+ continue;
+ }
+
// FlowEntryId event
// TODO: Fix the code below if we need again to handle
// the FlowEntryId events
@@ -272,12 +286,16 @@
if (enableOnrc2014MeasurementsFlows) {
- if (topologyEvents.isEmpty() && flowIdEvents.isEmpty()) {
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
+ switchDpidEvents.isEmpty()) {
return; // Nothing to do
}
Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+ // Process the Switch Dpid events
+ processSwitchDpidEvents();
+
// Process the Flow ID events
processFlowIdEvents(mySwitches);
@@ -491,6 +509,64 @@
}
/**
+ * Process the Switch Dpid events.
+ */
+ private void processSwitchDpidEvents() {
+ Map<Long, Dpid> addedSwitches = new HashMap<Long, Dpid>();
+ Map<Long, Dpid> removedSwitches = new HashMap<Long, Dpid>();
+
+ //
+ // Process all Switch Dpid events and update the appropriate state
+ //
+ for (EventEntry<Dpid> eventEntry : switchDpidEvents) {
+ Dpid dpid = eventEntry.eventData();
+
+ log.debug("SwitchDpid Event: {} {}", eventEntry.eventType(), dpid);
+
+ // Compute the final set of added and removed switches
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ addedSwitches.put(dpid.value(), dpid);
+ removedSwitches.remove(dpid.value());
+ break;
+ case ENTRY_REMOVE:
+ addedSwitches.remove(dpid.value());
+ removedSwitches.put(dpid.value(), dpid);
+ break;
+ }
+ }
+
+ //
+ // Remove the Flows from the local cache if the removed
+ // switch is the Source Switch.
+ //
+ // TODO: This search can be expensive for a large number of flows
+ // and should be optmized.
+ //
+ List<FlowId> deleteFlowIds = new LinkedList<FlowId>();
+ for (Dpid switchDpid : removedSwitches.values()) {
+ for (FlowPath flowPath : allFlowPaths.values()) {
+ Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+ if (srcDpid.value() == switchDpid.value())
+ deleteFlowIds.add(flowPath.flowId());
+ }
+ }
+ //
+ // Remove the Flows from the local cache
+ //
+ for (FlowId flowId : deleteFlowIds)
+ allFlowPaths.remove(flowId.value());
+
+ // Get the Flows for the added switches
+ Collection<FlowPath> flowPaths =
+ ParallelFlowDatabaseOperation.getFlowsForSwitches(dbHandler,
+ addedSwitches.values());
+ for (FlowPath flowPath : flowPaths) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
* Process the Flow ID events.
*
* @param mySwitches the collection of my switches.
@@ -750,7 +826,7 @@
switch (eventEntry.eventType()) {
case ENTRY_ADD:
- isTopologyModified |= topology.addTopologyElement(topologyElement);
+ isTopologyModified |= topology.addTopologyElement(topologyElement);
break;
case ENTRY_REMOVE:
isTopologyModified |= topology.removeTopologyElement(topologyElement);
@@ -1452,6 +1528,40 @@
}
/**
+ * Receive a notification that a switch is added to this instance.
+ *
+ * @param sw the switch that is added.
+ */
+ @Override
+ public void addedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_ADD, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a switch is removed from this instance.
+ *
+ * @param sw the switch that is removed.
+ */
+ @Override
+ public void removedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_REMOVE, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that the ports on a switch have changed.
+ */
+ @Override
+ public void switchPortChanged(Long switchId) {
+ // Nothing to do
+ }
+
+ /**
* Get a sorted copy of all Flow Paths.
*
* @return a sorted copy of all Flow Paths.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 9dfe999..b0a6642 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -102,6 +102,7 @@
*/
@Override
public void close() {
+ floodlightProvider.removeOFSwitchListener(flowEventHandler);
datagridService.deregisterFlowEventHandlerService(flowEventHandler);
dbHandlerApi.close();
dbHandlerInner.close();
@@ -228,6 +229,7 @@
// - startup
//
flowEventHandler = new FlowEventHandler(this, datagridService);
+ floodlightProvider.addOFSwitchListener(flowEventHandler);
datagridService.registerFlowEventHandlerService(flowEventHandler);
flowEventHandler.start();
}