added notifications to add/delete flow and added getFlowsForSwitches to ParallelFlowDatabaseOperation.java
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
index 3264c0a..f24d8b6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
@@ -2,18 +2,23 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -60,6 +65,58 @@
return flowPaths;
}
+ /*
+ * params: collection of dpid
+ *
+ * return: all flows where src dpid is in collection
+ *
+ * Note: this function is implemented naively and inefficiently
+ */
+ static ArrayList<FlowPath> getFlowsForSwitches(GraphDBOperation dbHandler, Collection<Dpid> switches) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ // convert the collection of switch dpids into a set of strings
+ Set<String> switchSet = new HashSet<>();
+ for(Dpid dpid : switches) {
+ switchSet.add(dpid.toString());
+ }
+
+ CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+ int numTasks = 0;
+ for(IFlowPath flowObj : flowPathsObj) {
+ if(switchSet.contains(flowObj.getSrcSwitch())) {
+ tasks.submit(new ExtractFlowTask(flowObj));
+ numTasks++;
+ }
+ }
+ for(int i = 0; i < numTasks; i++) {
+ try {
+ FlowPath flowPath = tasks.take().get();
+ if(flowPath != null) {
+ flowPaths.add(flowPath);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error reading FlowPath from IFlowPath object");
+ }
+ }
+ dbHandler.commit();
+ return flowPaths;
+ }
+
private final static class ExtractFlowTask implements Callable<FlowPath> {
private final IFlowPath flowObj;
@@ -73,7 +130,7 @@
}
static ArrayList<FlowPath> getFlows(GraphDBOperation dbHandler,
- Collection<FlowId> flowIds) {
+ Collection<FlowId> flowIds) {
ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
@@ -112,29 +169,65 @@
}
static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath) {
- executor.submit(new AddFlowTask(dbHandler, flowPath));
+ Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
+ // NOTE: This function is blocking
+ try {
+ return result.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return false;
+ }
+ }
+
+ static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
+ executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
// TODO: If we need the results, submit returns a Future that contains
// the result.
return true;
+
}
private final static class AddFlowTask implements Callable<Boolean> {
private final GraphDBOperation dbHandler;
private final FlowPath flowPath;
+ private final IDatagridService datagridService;
- AddFlowTask(GraphDBOperation dbHandler, FlowPath flowPath) {
+ AddFlowTask(GraphDBOperation dbHandler,
+ FlowPath flowPath,
+ IDatagridService datagridService) {
this.dbHandler = dbHandler;
this.flowPath = flowPath;
+ this.datagridService = datagridService;
}
@Override
public Boolean call() throws Exception {
- return FlowDatabaseOperation.addFlow(dbHandler, flowPath);
+ boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
+ if(success) {
+ if(datagridService != null) {
+ datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
+ flowPath.dataPath().srcPort().dpid());
+ }
+ }
+ else {
+ log.error("Error adding flow path {} to database", flowPath);
+ }
+ return success;
+
}
}
static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
- executor.submit(new DeleteFlowTask(dbHandler, flowId));
+ Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
+ // NOTE: This function is blocking
+ try {
+ return result.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return false;
+ }
+ }
+
+ static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+ executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
// TODO: If we need the results, submit returns a Future that contains
// the result.
return true;
@@ -143,14 +236,25 @@
private final static class DeleteFlowTask implements Callable<Boolean> {
private final GraphDBOperation dbHandler;
private final FlowId flowId;
+ private final IDatagridService datagridService;
- DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId) {
+ DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
this.dbHandler = dbHandler;
this.flowId = flowId;
+ this.datagridService = datagridService;
}
@Override
public Boolean call() throws Exception{
- return FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+ boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+ if(success) {
+ if(datagridService != null) {
+ datagridService.notificationSendFlowIdRemoved(flowId);
+ }
+ }
+ else {
+ log.error("Error removing flow path {} from database", flowId);
+ }
+ return success;
}
}
}