blob: 50834d2fd9e061c30619f78afc00339a3906d8eb [file] [log] [blame]
Brian O'Connor0ca538c2014-01-10 15:42:34 -08001package net.onrc.onos.ofcontroller.flowmanager;
2
3import java.util.ArrayList;
4import java.util.Collection;
Brian O'Connorc1e3c452014-01-10 16:13:42 -08005import java.util.HashSet;
6import java.util.Set;
Brian O'Connor0ca538c2014-01-10 15:42:34 -08007import java.util.concurrent.Callable;
8import java.util.concurrent.CompletionService;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ExecutorCompletionService;
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080013import java.util.concurrent.Future;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080014
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
17
Brian O'Connorc1e3c452014-01-10 16:13:42 -080018import net.onrc.onos.datagrid.IDatagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080019import net.onrc.onos.graph.GraphDBOperation;
20import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080021import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -080022import net.onrc.onos.ofcontroller.util.FlowEntry;
23import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080024import net.onrc.onos.ofcontroller.util.FlowId;
25import net.onrc.onos.ofcontroller.util.FlowPath;
26
Brian O'Connor7fb57862014-01-10 17:13:38 -080027/**
28 * Class for performing parallel Flow-related operations on the Database.
29 *
30 * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
31 * for parallelization.
32 *
33 * @author Brian O'Connor <brian@onlab.us>
34 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -080035public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
36 private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
37
Brian O'Connor8e49d202014-01-10 17:17:18 -080038 private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
Brian O'Connor0ca538c2014-01-10 15:42:34 -080039 private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
40
Brian O'Connor7fb57862014-01-10 17:13:38 -080041 /**
42 * Get all installed flows by first querying the database for all FlowPaths
43 * and then populating them from the database in parallel.
44 *
45 * @param dbHandler the Graph Database handler to use.
46 * @return the Flow Paths if found, otherwise an empty list.
47 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -080048 static ArrayList<FlowPath> getAllFlows(GraphDBOperation dbHandler) {
49 Iterable<IFlowPath> flowPathsObj = null;
50 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
51
52 try {
53 flowPathsObj = dbHandler.getAllFlowPaths();
54 } catch (Exception e) {
55 // TODO: handle exceptions
56 dbHandler.rollback();
57 log.error(":getAllFlowPaths failed");
58 return flowPaths;
59 }
60 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
61 dbHandler.commit();
62 return flowPaths; // No Flows found
63 }
64
65 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
66 int numTasks = 0;
67 for(IFlowPath flowObj : flowPathsObj) {
68 tasks.submit(new ExtractFlowTask(flowObj));
69 numTasks++;
70 }
71 for(int i = 0; i < numTasks; i++) {
72 try {
73 FlowPath flowPath = tasks.take().get();
74 if(flowPath != null) {
75 flowPaths.add(flowPath);
76 }
77 } catch (InterruptedException | ExecutionException e) {
78 log.error("Error reading FlowPath from IFlowPath object");
79 }
80 }
81 dbHandler.commit();
82 return flowPaths;
83 }
84
Brian O'Connor7fb57862014-01-10 17:13:38 -080085 /**
86 * Query the database for all flow paths that have their source switch
87 * in the provided collection
Brian O'Connorc1e3c452014-01-10 16:13:42 -080088 *
89 * Note: this function is implemented naively and inefficiently
Brian O'Connor7fb57862014-01-10 17:13:38 -080090 *
91 * @param dbHandler the Graph Database handler to use.
92 * @param switches a collection of switches whose flow paths you want
93 * @return the Flow Paths if found, otherwise an empty list.
Brian O'Connorc1e3c452014-01-10 16:13:42 -080094 */
95 static ArrayList<FlowPath> getFlowsForSwitches(GraphDBOperation dbHandler, Collection<Dpid> switches) {
96 Iterable<IFlowPath> flowPathsObj = null;
97 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
98
99 try {
100 flowPathsObj = dbHandler.getAllFlowPaths();
101 } catch (Exception e) {
102 // TODO: handle exceptions
103 dbHandler.rollback();
104 log.error(":getAllFlowPaths failed");
105 return flowPaths;
106 }
107 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
108 dbHandler.commit();
109 return flowPaths; // No Flows found
110 }
111
112 // convert the collection of switch dpids into a set of strings
113 Set<String> switchSet = new HashSet<>();
114 for(Dpid dpid : switches) {
115 switchSet.add(dpid.toString());
116 }
117
118 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
119 int numTasks = 0;
120 for(IFlowPath flowObj : flowPathsObj) {
121 if(switchSet.contains(flowObj.getSrcSwitch())) {
122 tasks.submit(new ExtractFlowTask(flowObj));
123 numTasks++;
124 }
125 }
126 for(int i = 0; i < numTasks; i++) {
127 try {
128 FlowPath flowPath = tasks.take().get();
129 if(flowPath != null) {
130 flowPaths.add(flowPath);
131 }
132 } catch (InterruptedException | ExecutionException e) {
133 log.error("Error reading FlowPath from IFlowPath object");
134 }
135 }
136 dbHandler.commit();
137 return flowPaths;
138 }
139
Brian O'Connor7fb57862014-01-10 17:13:38 -0800140 /**
141 * The basic parallelization unit for extracting FlowEntries from the database.
142 *
143 * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
144 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800145 private final static class ExtractFlowTask implements Callable<FlowPath> {
146 private final IFlowPath flowObj;
147
148 ExtractFlowTask(IFlowPath flowObj){
149 this.flowObj = flowObj;
150 }
151 @Override
152 public FlowPath call() throws Exception {
153 return extractFlowPath(flowObj);
154 }
155 }
156
Brian O'Connor7fb57862014-01-10 17:13:38 -0800157 /**
158 * Get a subset of installed flows in parallel.
159 *
160 * @param dbHandler the Graph Database handler to use.
161 * @param flowIds the collection of Flow IDs to get.
162 * @return the Flow Paths if found, otherwise an empty list.
163 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800164 static ArrayList<FlowPath> getFlows(GraphDBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800165 Collection<FlowId> flowIds) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800166 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
167
168 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
169 int numTasks = 0;
170 for (FlowId flowId : flowIds) {
171 tasks.submit(new GetFlowTask(dbHandler, flowId));
172 numTasks++;
173 }
174 for(int i = 0; i < numTasks; i++) {
175 try {
176 FlowPath flowPath = tasks.take().get();
177 if(flowPath != null) {
178 flowPaths.add(flowPath);
179 }
180 } catch (InterruptedException | ExecutionException e) {
181 log.error("Error reading FlowPath from database");
182 }
183 }
184 // TODO: should we commit?
185 //dbHandler.commit();
186 return flowPaths;
187 }
188
Brian O'Connor7fb57862014-01-10 17:13:38 -0800189 /**
190 * The basic parallelization unit for getting FlowEntries.
191 *
192 * This is simply a wrapper for FlowDatabaseOperation.getFlow()
193 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800194 private final static class GetFlowTask implements Callable<FlowPath> {
195 private final GraphDBOperation dbHandler;
196 private final FlowId flowId;
197
198 GetFlowTask(GraphDBOperation dbHandler, FlowId flowId) {
199 this.dbHandler = dbHandler;
200 this.flowId = flowId;
201 }
202 @Override
203 public FlowPath call() throws Exception{
204 return getFlow(dbHandler, flowId);
205 }
206 }
207
Brian O'Connor7fb57862014-01-10 17:13:38 -0800208 /**
209 * Add a flow by creating a database task, then waiting for the result.
210 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
211 * performance benefit.
212 *
213 * @param dbHandler the Graph Database handler to use.
214 * @param flowPath the Flow Path to install.
215 * @return true on success, otherwise false.
216 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800217 static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800218 Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
219 // NOTE: This function is blocking
220 try {
221 return result.get();
222 } catch (InterruptedException | ExecutionException e) {
223 return false;
224 }
225 }
226
Brian O'Connor7fb57862014-01-10 17:13:38 -0800227 /**
228 * Add a flow asynchronously by creating a database task.
229 *
230 * @param dbHandler the Graph Database handler to use.
231 * @param flowPath the Flow Path to install.
232 * @param datagridService the notification service for when the task is completed
233 * @return true always
234 */
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800235 static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
236 executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800237 // TODO: If we need the results, submit returns a Future that contains
238 // the result.
239 return true;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800240
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800241 }
242
Brian O'Connor7fb57862014-01-10 17:13:38 -0800243 /**
244 * The basic parallelization unit for adding FlowPaths.
245 *
246 * This is simply a wrapper for FlowDatabaseOperation.addFlow(),
247 * which also sends a notification if a datagrid services is provided
248 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800249 private final static class AddFlowTask implements Callable<Boolean> {
250 private final GraphDBOperation dbHandler;
251 private final FlowPath flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800252 private final IDatagridService datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800253
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800254 AddFlowTask(GraphDBOperation dbHandler,
255 FlowPath flowPath,
256 IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800257 this.dbHandler = dbHandler;
258 this.flowPath = flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800259 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800260 }
261
262 @Override
263 public Boolean call() throws Exception {
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800264// String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
265 String tag1 = "FlowDatabaseOperation.AddFlow";
266// String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
267 String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry";
268 PerformanceMonitor.Measurement m;
269 m = PerformanceMonitor.start(tag1);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800270 boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
Pavlin Radoslavov9fbdc992014-01-12 20:56:58 -0800271 PerformanceMonitor.stop(tag1);
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800272 m.stop();
273 m = PerformanceMonitor.start(tag2);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800274 if(success) {
275 if(datagridService != null) {
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -0800276 // Send notifications for each Flow Entry
277 for (FlowEntry flowEntry : flowPath.flowEntries()) {
278 if (flowEntry.flowEntrySwitchState() !=
279 FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
280 continue;
281 }
282 //
283 // Write the Flow Entry to the Datagrid
284 //
285 switch (flowEntry.flowEntryUserState()) {
286 case FE_USER_ADD:
287 datagridService.notificationSendFlowEntryAdded(flowEntry);
288 break;
289 case FE_USER_MODIFY:
290 datagridService.notificationSendFlowEntryUpdated(flowEntry);
291 break;
292 case FE_USER_DELETE:
293 datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
294 break;
295 case FE_USER_UNKNOWN:
296 assert(false);
297 break;
298 }
299 }
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800300 }
301 }
302 else {
303 log.error("Error adding flow path {} to database", flowPath);
304 }
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800305 m.stop();
306// PerformanceMonitor.report(tag1);
307// PerformanceMonitor.report(tag2);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800308 return success;
309
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800310 }
311 }
312
Brian O'Connor7fb57862014-01-10 17:13:38 -0800313 /**
314 * Delete a previously added flow by creating a database task, then waiting
315 * for the result.
316 *
317 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
318 * performance benefit.
319 *
320 * @param dbHandler the Graph Database handler to use.
321 * @param flowId the Flow ID of the flow to delete.
322 * @return true on success, otherwise false.
323 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800324 static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800325 Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
326 // NOTE: This function is blocking
327 try {
328 return result.get();
329 } catch (InterruptedException | ExecutionException e) {
330 return false;
331 }
332 }
333
Brian O'Connor7fb57862014-01-10 17:13:38 -0800334 /**
335 * Delete a previously added flow asynchronously by creating a database task.
336 *
337 * @param dbHandler the Graph Database handler to use.
338 * @param flowId the Flow ID of the flow to delete.
339 * @param datagridService the notification service for when the task is completed
340 * @return true always
341 */
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800342 static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
343 executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800344 // TODO: If we need the results, submit returns a Future that contains
345 // the result.
346 return true;
347 }
348
Brian O'Connor7fb57862014-01-10 17:13:38 -0800349 /**
350 * The basic parallelization unit for deleting FlowPaths.
351 *
352 * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
353 * which also sends a notification if a datagrid services is provided
354 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800355 private final static class DeleteFlowTask implements Callable<Boolean> {
356 private final GraphDBOperation dbHandler;
357 private final FlowId flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800358 private final IDatagridService datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800359
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800360 DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800361 this.dbHandler = dbHandler;
362 this.flowId = flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800363 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800364 }
365 @Override
366 public Boolean call() throws Exception{
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800367 boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
368 if(success) {
369 if(datagridService != null) {
370 datagridService.notificationSendFlowIdRemoved(flowId);
371 }
372 }
373 else {
374 log.error("Error removing flow path {} from database", flowId);
375 }
376 return success;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800377 }
378 }
379}