* Added the skeleton for the Path Computation implementation.
* Refactor the interaction between the Datagrid Service and the Flow Manager:
Now the Path Computation service itself receives the notifications
from the Datagrid Service without using the Flow Manager as a middle-man.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index cfc6182..481002f 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -18,7 +18,7 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.flowmanager.IPathComputationService;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -55,7 +55,7 @@
// State related to the Flow map
protected static final String mapFlowName = "mapFlow";
- private IFlowService flowService = null;
+ private IPathComputationService pathComputationService = null;
private IMap<Long, byte[]> mapFlow = null;
private MapFlowListener mapFlowListener = null;
private String mapFlowListenerId = null;
@@ -90,7 +90,7 @@
Input input = new Input(valueBytes);
FlowPath flowPath = kryo.readObject(input, FlowPath.class);
kryoFactory.deleteKryo(kryo);
- flowService.notificationRecvFlowAdded(flowPath);
+ pathComputationService.notificationRecvFlowAdded(flowPath);
}
/**
@@ -109,7 +109,7 @@
Input input = new Input(valueBytes);
FlowPath flowPath = kryo.readObject(input, FlowPath.class);
kryoFactory.deleteKryo(kryo);
- flowService.notificationRecvFlowRemoved(flowPath);
+ pathComputationService.notificationRecvFlowRemoved(flowPath);
}
/**
@@ -128,7 +128,7 @@
Input input = new Input(valueBytes);
FlowPath flowPath = kryo.readObject(input, FlowPath.class);
kryoFactory.deleteKryo(kryo);
- flowService.notificationRecvFlowUpdated(flowPath);
+ pathComputationService.notificationRecvFlowUpdated(flowPath);
}
/**
@@ -166,7 +166,7 @@
TopologyElement topologyElement =
kryo.readObject(input, TopologyElement.class);
kryoFactory.deleteKryo(kryo);
- flowService.notificationRecvTopologyElementAdded(topologyElement);
+ pathComputationService.notificationRecvTopologyElementAdded(topologyElement);
}
/**
@@ -186,7 +186,7 @@
TopologyElement topologyElement =
kryo.readObject(input, TopologyElement.class);
kryoFactory.deleteKryo(kryo);
- flowService.notificationRecvTopologyElementRemoved(topologyElement);
+ pathComputationService.notificationRecvTopologyElementRemoved(topologyElement);
}
/**
@@ -206,7 +206,7 @@
TopologyElement topologyElement =
kryo.readObject(input, TopologyElement.class);
kryoFactory.deleteKryo(kryo);
- flowService.notificationRecvTopologyElementUpdated(topologyElement);
+ pathComputationService.notificationRecvTopologyElementUpdated(topologyElement);
}
/**
@@ -332,15 +332,16 @@
}
/**
- * Register Flow Service for receiving Flow-related notifications.
+ * Register Path Computation Service for receiving Flow-related
+ * notifications.
*
- * NOTE: Only a single Flow Service can be registered.
+ * NOTE: Only a single Path Computation Service can be registered.
*
- * @param flowService the Flow Service to register.
+ * @param pathComputationService the Path Computation Service to register.
*/
@Override
- public void registerFlowService(IFlowService flowService) {
- this.flowService = flowService;
+ public void registerPathComputationService(IPathComputationService pathComputationService) {
+ this.pathComputationService = pathComputationService;
// Initialize the Flow-related map state
mapFlowListener = new MapFlowListener();
@@ -354,14 +355,16 @@
}
/**
- * De-register Flow Service for receiving Flow-related notifications.
+ * De-register Path Computation Service for receiving Flow-related
+ * notifications.
*
- * NOTE: Only a single Flow Service can be registered.
+ * NOTE: Only a single Path Computation Service can be registered.
*
- * @param flowService the Flow Service to de-register.
+ * @param pathComputationService the Path Computation Service to
+ * de-register.
*/
@Override
- public void deregisterFlowService(IFlowService flowService) {
+ public void deregisterPathComputationService(IPathComputationService pathComputationService) {
// Clear the Flow-related map state
mapFlow.removeEntryListener(mapFlowListenerId);
mapFlow = null;
@@ -372,7 +375,7 @@
mapTopology = null;
mapTopologyListener = null;
- this.flowService = null;
+ this.pathComputationService = null;
}
/**
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index bf8e41f..10cd1e4 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -4,7 +4,7 @@
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.flowmanager.IPathComputationService;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -14,22 +14,25 @@
*/
public interface IDatagridService extends IFloodlightService {
/**
- * Register Flow Service for receiving Flow-related notifications.
+ * Register Path Computation Service for receiving Flow-related
+ * notifications.
*
- * NOTE: Only a single Flow Service can be registered.
+ * NOTE: Only a single Path Computation Service can be registered.
*
- * @param flowService the Flow Service to register.
+ * @param pathComputationService the Path Computation Service to register.
*/
- void registerFlowService(IFlowService flowService);
+ void registerPathComputationService(IPathComputationService pathComputationService);
/**
- * De-register Flow Service for receiving Flow-related notifications.
+ * De-register Path Computation Service for receiving Flow-related
+ * notifications.
*
- * NOTE: Only a single Flow Service can be registered.
+ * NOTE: Only a single Path Computation Service can be registered.
*
- * @param flowService the Flow Service to de-register.
+ * @param pathComputationService the Path Computation Service to
+ * de-register.
*/
- void deregisterFlowService(IFlowService flowService);
+ void deregisterPathComputationService(IPathComputationService pathComputationService);
/**
* Get all Flows that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 6926705..31a6bf4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -7,9 +7,7 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -50,6 +48,7 @@
protected volatile IDatagridService datagridService;
protected IRestApiService restApi;
protected FloodlightModuleContext context;
+ protected PathComputation pathComputation;
protected OFMessageDamper messageDamper;
@@ -73,14 +72,6 @@
private ScheduledExecutorService mapReaderScheduler;
private ScheduledExecutorService shortestPathReconcileScheduler;
- // The queue with Flow Path updates
- protected BlockingQueue<EventEntry<FlowPath>> flowPathEvents =
- new LinkedBlockingQueue<EventEntry<FlowPath>>();
-
- // The queue with Topology Element updates
- protected BlockingQueue<EventEntry<TopologyElement>> topologyEvents =
- new LinkedBlockingQueue<EventEntry<TopologyElement>>();
-
/**
* Periodic task for reading the Flow Entries and pushing changes
* into the switches.
@@ -393,7 +384,7 @@
*/
@Override
public void close() {
- datagridService.deregisterFlowService(this);
+ datagridService.deregisterPathComputationService(pathComputation);
dbHandler.close();
}
@@ -502,27 +493,15 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
- // Register with the Datagrid Service
- datagridService.registerFlowService(this);
+ //
+ // Create the Path Computation thread and register it with the
+ // Datagrid Service
+ //
+ pathComputation = new PathComputation(this, datagridService);
+ datagridService.registerPathComputationService(pathComputation);
- // Obtain the initial Topology state
- Collection<TopologyElement> topologyElements =
- datagridService.getAllTopologyElements();
- for (TopologyElement topologyElement : topologyElements) {
- EventEntry<TopologyElement> eventEntry =
- new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
- topologyEvents.add(eventEntry);
- }
-
- // Obtain the initial Flow state
- Collection<FlowPath> flowPaths = datagridService.getAllFlows();
- for (FlowPath flowPath : flowPaths) {
- EventEntry<FlowPath> eventEntry =
- new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
- flowPathEvents.add(eventEntry);
- }
-
- // Schedule the periodic tasks
+ // Schedule the threads and periodic tasks
+ pathComputation.start();
mapReaderScheduler.scheduleAtFixedRate(
mapReader, 3, 3, TimeUnit.SECONDS);
shortestPathReconcileScheduler.scheduleAtFixedRate(
@@ -830,64 +809,4 @@
//
return (installFlowEntry(mySwitch, flowPath, flowEntry));
}
-
- /**
- * Receive a notification that a Flow is added.
- *
- * @param flowPath the flow that is added.
- */
- @Override
- public void notificationRecvFlowAdded(FlowPath flowPath) {
- // TODO
- }
-
- /**
- * Receive a notification that a Flow is removed.
- *
- * @param flowPath the flow that is removed.
- */
- @Override
- public void notificationRecvFlowRemoved(FlowPath flowPath) {
- // TODO
- }
-
- /**
- * Receive a notification that a Flow is updated.
- *
- * @param flowPath the flow that is updated.
- */
- @Override
- public void notificationRecvFlowUpdated(FlowPath flowPath) {
- // TODO
- }
-
- /**
- * Receive a notification that a Topology Element is added.
- *
- * @param topologyElement the Topology Element that is added.
- */
- @Override
- public void notificationRecvTopologyElementAdded(TopologyElement topologyElement) {
- // TODO
- }
-
- /**
- * Receive a notification that a Topology Element is removed.
- *
- * @param topologyElement the Topology Element that is removed.
- */
- @Override
- public void notificationRecvTopologyElementRemoved(TopologyElement topologyElement) {
- // TODO
- }
-
- /**
- * Receive a notification that a Topology Element is updated.
- *
- * @param topologyElement the Topology Element that is updated.
- */
- @Override
- public void notificationRecvTopologyElementUpdated(TopologyElement topologyElement) {
- // TODO
- }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index bd9819e..1f8cd5b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -4,7 +4,6 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
-import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
import net.onrc.onos.ofcontroller.util.FlowId;
@@ -113,46 +112,4 @@
* @return the added shortest-path flow on success, otherwise null.
*/
FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
-
- /**
- * Receive a notification that a Flow is added.
- *
- * @param flowPath the flow that is added.
- */
- void notificationRecvFlowAdded(FlowPath flowPath);
-
- /**
- * Receive a notification that a Flow is removed.
- *
- * @param flowPath the flow that is removed.
- */
- void notificationRecvFlowRemoved(FlowPath flowPath);
-
- /**
- * Receive a notification that a Flow is updated.
- *
- * @param flowPath the flow that is updated.
- */
- void notificationRecvFlowUpdated(FlowPath flowPath);
-
- /**
- * Receive a notification that a Topology Element is added.
- *
- * @param topologyElement the Topology Element that is added.
- */
- void notificationRecvTopologyElementAdded(TopologyElement topologyElement);
-
- /**
- * Receive a notification that a Topology Element is removed.
- *
- * @param topologyElement the Topology Element that is removed.
- */
- void notificationRecvTopologyElementRemoved(TopologyElement topologyElement);
-
- /**
- * Receive a notification that a Topology Element is updated.
- *
- * @param topologyElement the Topology Element that is updated.
- */
- void notificationRecvTopologyElementUpdated(TopologyElement topologyElement);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IPathComputationService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IPathComputationService.java
new file mode 100644
index 0000000..1bc0be1
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IPathComputationService.java
@@ -0,0 +1,51 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+/**
+ * Interface for providing Path Computation Service to other modules.
+ */
+public interface IPathComputationService {
+ /**
+ * Receive a notification that a Flow is added.
+ *
+ * @param flowPath the flow that is added.
+ */
+ void notificationRecvFlowAdded(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Flow is removed.
+ *
+ * @param flowPath the flow that is removed.
+ */
+ void notificationRecvFlowRemoved(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Flow is updated.
+ *
+ * @param flowPath the flow that is updated.
+ */
+ void notificationRecvFlowUpdated(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ void notificationRecvTopologyElementAdded(TopologyElement topologyElement);
+
+ /**
+ * Receive a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ void notificationRecvTopologyElementRemoved(TopologyElement topologyElement);
+
+ /**
+ * Receive a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ void notificationRecvTopologyElementUpdated(TopologyElement topologyElement);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PathComputation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PathComputation.java
new file mode 100644
index 0000000..d4950d2
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PathComputation.java
@@ -0,0 +1,196 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.EventEntry;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for implementing the Path Computation and Path Maintenance.
+ */
+class PathComputation extends Thread implements IPathComputationService {
+ /** The logger. */
+ private final static Logger log = LoggerFactory.getLogger(PathComputation.class);
+
+ private FlowManager flowManager; // The Flow Manager to use
+ private IDatagridService datagridService; // The Datagrid Service to use
+
+ // The queue with Flow Path and Topology Element updates
+ private BlockingQueue<EventEntry<?>> networkEvents =
+ new LinkedBlockingQueue<EventEntry<?>>();
+
+ // The pending Topology and Flow Path events
+ private List<EventEntry<TopologyElement>> topologyEvents =
+ new LinkedList<EventEntry<TopologyElement>>();
+ private List<EventEntry<FlowPath>> flowPathEvents =
+ new LinkedList<EventEntry<FlowPath>>();
+
+ /**
+ * Constructor for a given Flow Manager and Datagrid Service.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param datagridService the Datagrid Service to use.
+ */
+ PathComputation(FlowManager flowManager,
+ IDatagridService datagridService) {
+ this.flowManager = flowManager;
+ this.datagridService = datagridService;
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ //
+ // Obtain the initial Topology state
+ //
+ Collection<TopologyElement> topologyElements =
+ datagridService.getAllTopologyElements();
+ for (TopologyElement topologyElement : topologyElements) {
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
+ topologyEvents.add(eventEntry);
+ }
+ //
+ // Obtain the initial Flow Path state
+ //
+ Collection<FlowPath> flowPaths = datagridService.getAllFlows();
+ for (FlowPath flowPath : flowPaths) {
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
+ flowPathEvents.add(eventEntry);
+ }
+ // Process the events (if any)
+ processEvents();
+
+ //
+ // The main loop
+ //
+ Collection<EventEntry<?>> collection = new LinkedList<EventEntry<?>>();
+ try {
+ while (true) {
+ EventEntry<?> eventEntry = networkEvents.take();
+ collection.add(eventEntry);
+ networkEvents.drainTo(collection);
+
+ // Demultiplex all events
+ for (EventEntry<?> event : collection) {
+ if (event.eventData() instanceof TopologyElement) {
+ EventEntry<TopologyElement> topologyEventEntry =
+ (EventEntry<TopologyElement>)event;
+ topologyEvents.add(topologyEventEntry);
+ } else if (event.eventData() instanceof FlowPath) {
+ EventEntry<FlowPath> flowPathEventEntry =
+ (EventEntry<FlowPath>)event;
+ flowPathEvents.add(flowPathEventEntry);
+ }
+ }
+ collection.clear();
+ // Process the events (if any)
+ processEvents();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception processing Network Events: ", exception);
+ }
+ }
+
+ /**
+ * Process the events (if any)
+ */
+ private void processEvents() {
+ if (topologyEvents.isEmpty() && flowPathEvents.isEmpty())
+ return; // Nothing to do
+
+ // TODO: Implement it!
+
+ System.out.println("PAVPAV: Topology Events = " + topologyEvents.size() + " Flow Path Events = " + flowPathEvents.size());
+
+ topologyEvents.clear();
+ flowPathEvents.clear();
+ }
+
+ /**
+ * Receive a notification that a Flow is added.
+ *
+ * @param flowPath the flow that is added.
+ */
+ @Override
+ public void notificationRecvFlowAdded(FlowPath flowPath) {
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Flow is removed.
+ *
+ * @param flowPath the flow that is removed.
+ */
+ @Override
+ public void notificationRecvFlowRemoved(FlowPath flowPath) {
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_REMOVE, flowPath);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Flow is updated.
+ *
+ * @param flowPath the flow that is updated.
+ */
+ @Override
+ public void notificationRecvFlowUpdated(FlowPath flowPath) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ @Override
+ public void notificationRecvTopologyElementAdded(TopologyElement topologyElement) {
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ @Override
+ public void notificationRecvTopologyElementRemoved(TopologyElement topologyElement) {
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_REMOVE, topologyElement);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ @Override
+ public void notificationRecvTopologyElementUpdated(TopologyElement topologyElement) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
+ networkEvents.add(eventEntry);
+ }
+}