blob: f24d8b621bad90ab83b02eb2fa0164fd0ab5ec71 [file] [log] [blame]
Brian O'Connor0ca538c2014-01-10 15:42:34 -08001package net.onrc.onos.ofcontroller.flowmanager;
2
3import java.util.ArrayList;
4import java.util.Collection;
Brian O'Connorc1e3c452014-01-10 16:13:42 -08005import java.util.HashSet;
6import java.util.Set;
Brian O'Connor0ca538c2014-01-10 15:42:34 -08007import java.util.concurrent.Callable;
8import java.util.concurrent.CompletionService;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ExecutorCompletionService;
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080013import java.util.concurrent.Future;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080014
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
17
Brian O'Connorc1e3c452014-01-10 16:13:42 -080018import net.onrc.onos.datagrid.IDatagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080019import net.onrc.onos.graph.GraphDBOperation;
20import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080021import net.onrc.onos.ofcontroller.util.Dpid;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080022import net.onrc.onos.ofcontroller.util.FlowId;
23import net.onrc.onos.ofcontroller.util.FlowPath;
24
25public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
26 private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
27
28 private final static int numThreads = 20;
29 private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
30
31 static ArrayList<FlowPath> getAllFlows(GraphDBOperation dbHandler) {
32 Iterable<IFlowPath> flowPathsObj = null;
33 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
34
35 try {
36 flowPathsObj = dbHandler.getAllFlowPaths();
37 } catch (Exception e) {
38 // TODO: handle exceptions
39 dbHandler.rollback();
40 log.error(":getAllFlowPaths failed");
41 return flowPaths;
42 }
43 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
44 dbHandler.commit();
45 return flowPaths; // No Flows found
46 }
47
48 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
49 int numTasks = 0;
50 for(IFlowPath flowObj : flowPathsObj) {
51 tasks.submit(new ExtractFlowTask(flowObj));
52 numTasks++;
53 }
54 for(int i = 0; i < numTasks; i++) {
55 try {
56 FlowPath flowPath = tasks.take().get();
57 if(flowPath != null) {
58 flowPaths.add(flowPath);
59 }
60 } catch (InterruptedException | ExecutionException e) {
61 log.error("Error reading FlowPath from IFlowPath object");
62 }
63 }
64 dbHandler.commit();
65 return flowPaths;
66 }
67
Brian O'Connorc1e3c452014-01-10 16:13:42 -080068 /*
69 * params: collection of dpid
70 *
71 * return: all flows where src dpid is in collection
72 *
73 * Note: this function is implemented naively and inefficiently
74 */
75 static ArrayList<FlowPath> getFlowsForSwitches(GraphDBOperation dbHandler, Collection<Dpid> switches) {
76 Iterable<IFlowPath> flowPathsObj = null;
77 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
78
79 try {
80 flowPathsObj = dbHandler.getAllFlowPaths();
81 } catch (Exception e) {
82 // TODO: handle exceptions
83 dbHandler.rollback();
84 log.error(":getAllFlowPaths failed");
85 return flowPaths;
86 }
87 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
88 dbHandler.commit();
89 return flowPaths; // No Flows found
90 }
91
92 // convert the collection of switch dpids into a set of strings
93 Set<String> switchSet = new HashSet<>();
94 for(Dpid dpid : switches) {
95 switchSet.add(dpid.toString());
96 }
97
98 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
99 int numTasks = 0;
100 for(IFlowPath flowObj : flowPathsObj) {
101 if(switchSet.contains(flowObj.getSrcSwitch())) {
102 tasks.submit(new ExtractFlowTask(flowObj));
103 numTasks++;
104 }
105 }
106 for(int i = 0; i < numTasks; i++) {
107 try {
108 FlowPath flowPath = tasks.take().get();
109 if(flowPath != null) {
110 flowPaths.add(flowPath);
111 }
112 } catch (InterruptedException | ExecutionException e) {
113 log.error("Error reading FlowPath from IFlowPath object");
114 }
115 }
116 dbHandler.commit();
117 return flowPaths;
118 }
119
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800120 private final static class ExtractFlowTask implements Callable<FlowPath> {
121 private final IFlowPath flowObj;
122
123 ExtractFlowTask(IFlowPath flowObj){
124 this.flowObj = flowObj;
125 }
126 @Override
127 public FlowPath call() throws Exception {
128 return extractFlowPath(flowObj);
129 }
130 }
131
132 static ArrayList<FlowPath> getFlows(GraphDBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800133 Collection<FlowId> flowIds) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800134 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
135
136 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
137 int numTasks = 0;
138 for (FlowId flowId : flowIds) {
139 tasks.submit(new GetFlowTask(dbHandler, flowId));
140 numTasks++;
141 }
142 for(int i = 0; i < numTasks; i++) {
143 try {
144 FlowPath flowPath = tasks.take().get();
145 if(flowPath != null) {
146 flowPaths.add(flowPath);
147 }
148 } catch (InterruptedException | ExecutionException e) {
149 log.error("Error reading FlowPath from database");
150 }
151 }
152 // TODO: should we commit?
153 //dbHandler.commit();
154 return flowPaths;
155 }
156
157 private final static class GetFlowTask implements Callable<FlowPath> {
158 private final GraphDBOperation dbHandler;
159 private final FlowId flowId;
160
161 GetFlowTask(GraphDBOperation dbHandler, FlowId flowId) {
162 this.dbHandler = dbHandler;
163 this.flowId = flowId;
164 }
165 @Override
166 public FlowPath call() throws Exception{
167 return getFlow(dbHandler, flowId);
168 }
169 }
170
171 static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800172 Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
173 // NOTE: This function is blocking
174 try {
175 return result.get();
176 } catch (InterruptedException | ExecutionException e) {
177 return false;
178 }
179 }
180
181 static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
182 executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800183 // TODO: If we need the results, submit returns a Future that contains
184 // the result.
185 return true;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800186
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800187 }
188
189 private final static class AddFlowTask implements Callable<Boolean> {
190 private final GraphDBOperation dbHandler;
191 private final FlowPath flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800192 private final IDatagridService datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800193
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800194 AddFlowTask(GraphDBOperation dbHandler,
195 FlowPath flowPath,
196 IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800197 this.dbHandler = dbHandler;
198 this.flowPath = flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800199 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800200 }
201
202 @Override
203 public Boolean call() throws Exception {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800204 boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
205 if(success) {
206 if(datagridService != null) {
207 datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
208 flowPath.dataPath().srcPort().dpid());
209 }
210 }
211 else {
212 log.error("Error adding flow path {} to database", flowPath);
213 }
214 return success;
215
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800216 }
217 }
218
219 static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800220 Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
221 // NOTE: This function is blocking
222 try {
223 return result.get();
224 } catch (InterruptedException | ExecutionException e) {
225 return false;
226 }
227 }
228
229 static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
230 executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800231 // TODO: If we need the results, submit returns a Future that contains
232 // the result.
233 return true;
234 }
235
236 private final static class DeleteFlowTask implements Callable<Boolean> {
237 private final GraphDBOperation dbHandler;
238 private final FlowId flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800239 private final IDatagridService datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800240
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800241 DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800242 this.dbHandler = dbHandler;
243 this.flowId = flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800244 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800245 }
246 @Override
247 public Boolean call() throws Exception{
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800248 boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
249 if(success) {
250 if(datagridService != null) {
251 datagridService.notificationSendFlowIdRemoved(flowId);
252 }
253 }
254 else {
255 log.error("Error removing flow path {} from database", flowId);
256 }
257 return success;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800258 }
259 }
260}