blob: 3264c0abb9e339135b4c2432a8a260ee748f3249 [file] [log] [blame]
Brian O'Connor0ca538c2014-01-10 15:42:34 -08001package net.onrc.onos.ofcontroller.flowmanager;
2
3import java.util.ArrayList;
4import java.util.Collection;
5import java.util.concurrent.Callable;
6import java.util.concurrent.CompletionService;
7import java.util.concurrent.ExecutionException;
8import java.util.concurrent.ExecutorCompletionService;
9import java.util.concurrent.ExecutorService;
10import java.util.concurrent.Executors;
11
12import org.slf4j.Logger;
13import org.slf4j.LoggerFactory;
14
15import net.onrc.onos.graph.GraphDBOperation;
16import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
17import net.onrc.onos.ofcontroller.util.FlowId;
18import net.onrc.onos.ofcontroller.util.FlowPath;
19
20public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
21 private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
22
23 private final static int numThreads = 20;
24 private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
25
26 static ArrayList<FlowPath> getAllFlows(GraphDBOperation dbHandler) {
27 Iterable<IFlowPath> flowPathsObj = null;
28 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
29
30 try {
31 flowPathsObj = dbHandler.getAllFlowPaths();
32 } catch (Exception e) {
33 // TODO: handle exceptions
34 dbHandler.rollback();
35 log.error(":getAllFlowPaths failed");
36 return flowPaths;
37 }
38 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
39 dbHandler.commit();
40 return flowPaths; // No Flows found
41 }
42
43 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
44 int numTasks = 0;
45 for(IFlowPath flowObj : flowPathsObj) {
46 tasks.submit(new ExtractFlowTask(flowObj));
47 numTasks++;
48 }
49 for(int i = 0; i < numTasks; i++) {
50 try {
51 FlowPath flowPath = tasks.take().get();
52 if(flowPath != null) {
53 flowPaths.add(flowPath);
54 }
55 } catch (InterruptedException | ExecutionException e) {
56 log.error("Error reading FlowPath from IFlowPath object");
57 }
58 }
59 dbHandler.commit();
60 return flowPaths;
61 }
62
63 private final static class ExtractFlowTask implements Callable<FlowPath> {
64 private final IFlowPath flowObj;
65
66 ExtractFlowTask(IFlowPath flowObj){
67 this.flowObj = flowObj;
68 }
69 @Override
70 public FlowPath call() throws Exception {
71 return extractFlowPath(flowObj);
72 }
73 }
74
75 static ArrayList<FlowPath> getFlows(GraphDBOperation dbHandler,
76 Collection<FlowId> flowIds) {
77 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
78
79 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
80 int numTasks = 0;
81 for (FlowId flowId : flowIds) {
82 tasks.submit(new GetFlowTask(dbHandler, flowId));
83 numTasks++;
84 }
85 for(int i = 0; i < numTasks; i++) {
86 try {
87 FlowPath flowPath = tasks.take().get();
88 if(flowPath != null) {
89 flowPaths.add(flowPath);
90 }
91 } catch (InterruptedException | ExecutionException e) {
92 log.error("Error reading FlowPath from database");
93 }
94 }
95 // TODO: should we commit?
96 //dbHandler.commit();
97 return flowPaths;
98 }
99
100 private final static class GetFlowTask implements Callable<FlowPath> {
101 private final GraphDBOperation dbHandler;
102 private final FlowId flowId;
103
104 GetFlowTask(GraphDBOperation dbHandler, FlowId flowId) {
105 this.dbHandler = dbHandler;
106 this.flowId = flowId;
107 }
108 @Override
109 public FlowPath call() throws Exception{
110 return getFlow(dbHandler, flowId);
111 }
112 }
113
114 static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath) {
115 executor.submit(new AddFlowTask(dbHandler, flowPath));
116 // TODO: If we need the results, submit returns a Future that contains
117 // the result.
118 return true;
119 }
120
121 private final static class AddFlowTask implements Callable<Boolean> {
122 private final GraphDBOperation dbHandler;
123 private final FlowPath flowPath;
124
125 AddFlowTask(GraphDBOperation dbHandler, FlowPath flowPath) {
126 this.dbHandler = dbHandler;
127 this.flowPath = flowPath;
128 }
129
130 @Override
131 public Boolean call() throws Exception {
132 return FlowDatabaseOperation.addFlow(dbHandler, flowPath);
133 }
134 }
135
136 static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
137 executor.submit(new DeleteFlowTask(dbHandler, flowId));
138 // TODO: If we need the results, submit returns a Future that contains
139 // the result.
140 return true;
141 }
142
143 private final static class DeleteFlowTask implements Callable<Boolean> {
144 private final GraphDBOperation dbHandler;
145 private final FlowId flowId;
146
147 DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId) {
148 this.dbHandler = dbHandler;
149 this.flowId = flowId;
150 }
151 @Override
152 public Boolean call() throws Exception{
153 return FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
154 }
155 }
156}