blob: ec03d09bb64acb5a591aef92c4a8693b28bf8b5c [file] [log] [blame]
Brian O'Connor0ca538c2014-01-10 15:42:34 -08001package net.onrc.onos.ofcontroller.flowmanager;
2
3import java.util.ArrayList;
4import java.util.Collection;
Brian O'Connorc1e3c452014-01-10 16:13:42 -08005import java.util.HashSet;
6import java.util.Set;
Brian O'Connor0ca538c2014-01-10 15:42:34 -08007import java.util.concurrent.Callable;
8import java.util.concurrent.CompletionService;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ExecutorCompletionService;
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080013import java.util.concurrent.Future;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080014
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
17
Brian O'Connorc1e3c452014-01-10 16:13:42 -080018import net.onrc.onos.datagrid.IDatagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -080019import net.onrc.onos.graph.DBOperation;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080020import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080021import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -080022import net.onrc.onos.ofcontroller.util.FlowEntry;
23import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080024import net.onrc.onos.ofcontroller.util.FlowId;
25import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavov94aed682014-01-14 21:10:04 -080026import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
27
28import com.esotericsoftware.kryo2.Kryo;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080029
Brian O'Connor7fb57862014-01-10 17:13:38 -080030/**
31 * Class for performing parallel Flow-related operations on the Database.
32 *
33 * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
34 * for parallelization.
35 *
36 * @author Brian O'Connor <brian@onlab.us>
37 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -080038public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
39 private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
40
Brian O'Connor8e49d202014-01-10 17:17:18 -080041 private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
Brian O'Connor0ca538c2014-01-10 15:42:34 -080042 private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
43
Pavlin Radoslavov94aed682014-01-14 21:10:04 -080044 private static KryoFactory kryoFactory = new KryoFactory();
45
Brian O'Connor7fb57862014-01-10 17:13:38 -080046 /**
47 * Get all installed flows by first querying the database for all FlowPaths
48 * and then populating them from the database in parallel.
49 *
50 * @param dbHandler the Graph Database handler to use.
51 * @return the Flow Paths if found, otherwise an empty list.
52 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -080053 static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -080054 Iterable<IFlowPath> flowPathsObj = null;
55 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
56
57 try {
58 flowPathsObj = dbHandler.getAllFlowPaths();
59 } catch (Exception e) {
60 // TODO: handle exceptions
61 dbHandler.rollback();
62 log.error(":getAllFlowPaths failed");
63 return flowPaths;
64 }
65 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
66 dbHandler.commit();
67 return flowPaths; // No Flows found
68 }
69
70 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
71 int numTasks = 0;
72 for(IFlowPath flowObj : flowPathsObj) {
73 tasks.submit(new ExtractFlowTask(flowObj));
74 numTasks++;
75 }
76 for(int i = 0; i < numTasks; i++) {
77 try {
78 FlowPath flowPath = tasks.take().get();
79 if(flowPath != null) {
80 flowPaths.add(flowPath);
81 }
82 } catch (InterruptedException | ExecutionException e) {
83 log.error("Error reading FlowPath from IFlowPath object");
84 }
85 }
86 dbHandler.commit();
87 return flowPaths;
88 }
89
Brian O'Connor7fb57862014-01-10 17:13:38 -080090 /**
91 * Query the database for all flow paths that have their source switch
92 * in the provided collection
Brian O'Connorc1e3c452014-01-10 16:13:42 -080093 *
94 * Note: this function is implemented naively and inefficiently
Brian O'Connor7fb57862014-01-10 17:13:38 -080095 *
96 * @param dbHandler the Graph Database handler to use.
97 * @param switches a collection of switches whose flow paths you want
98 * @return the Flow Paths if found, otherwise an empty list.
Brian O'Connorc1e3c452014-01-10 16:13:42 -080099 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800100 static ArrayList<FlowPath> getFlowsForSwitches(DBOperation dbHandler, Collection<Dpid> switches) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800101 Iterable<IFlowPath> flowPathsObj = null;
102 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
103
104 try {
105 flowPathsObj = dbHandler.getAllFlowPaths();
106 } catch (Exception e) {
107 // TODO: handle exceptions
108 dbHandler.rollback();
109 log.error(":getAllFlowPaths failed");
110 return flowPaths;
111 }
112 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
113 dbHandler.commit();
114 return flowPaths; // No Flows found
115 }
116
117 // convert the collection of switch dpids into a set of strings
118 Set<String> switchSet = new HashSet<>();
119 for(Dpid dpid : switches) {
120 switchSet.add(dpid.toString());
121 }
122
123 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
124 int numTasks = 0;
125 for(IFlowPath flowObj : flowPathsObj) {
126 if(switchSet.contains(flowObj.getSrcSwitch())) {
127 tasks.submit(new ExtractFlowTask(flowObj));
128 numTasks++;
129 }
130 }
131 for(int i = 0; i < numTasks; i++) {
132 try {
133 FlowPath flowPath = tasks.take().get();
134 if(flowPath != null) {
135 flowPaths.add(flowPath);
136 }
137 } catch (InterruptedException | ExecutionException e) {
138 log.error("Error reading FlowPath from IFlowPath object");
139 }
140 }
141 dbHandler.commit();
142 return flowPaths;
143 }
144
Brian O'Connor7fb57862014-01-10 17:13:38 -0800145 /**
146 * The basic parallelization unit for extracting FlowEntries from the database.
147 *
148 * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
149 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800150 private final static class ExtractFlowTask implements Callable<FlowPath> {
151 private final IFlowPath flowObj;
152
153 ExtractFlowTask(IFlowPath flowObj){
154 this.flowObj = flowObj;
155 }
156 @Override
157 public FlowPath call() throws Exception {
158 return extractFlowPath(flowObj);
159 }
160 }
161
Brian O'Connor7fb57862014-01-10 17:13:38 -0800162 /**
163 * Get a subset of installed flows in parallel.
164 *
165 * @param dbHandler the Graph Database handler to use.
166 * @param flowIds the collection of Flow IDs to get.
167 * @return the Flow Paths if found, otherwise an empty list.
168 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800169 static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800170 Collection<FlowId> flowIds) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800171 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
172
173 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
174 int numTasks = 0;
175 for (FlowId flowId : flowIds) {
176 tasks.submit(new GetFlowTask(dbHandler, flowId));
177 numTasks++;
178 }
179 for(int i = 0; i < numTasks; i++) {
180 try {
181 FlowPath flowPath = tasks.take().get();
182 if(flowPath != null) {
183 flowPaths.add(flowPath);
184 }
185 } catch (InterruptedException | ExecutionException e) {
186 log.error("Error reading FlowPath from database");
187 }
188 }
189 // TODO: should we commit?
190 //dbHandler.commit();
191 return flowPaths;
192 }
193
Brian O'Connor7fb57862014-01-10 17:13:38 -0800194 /**
195 * The basic parallelization unit for getting FlowEntries.
196 *
197 * This is simply a wrapper for FlowDatabaseOperation.getFlow()
198 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800199 private final static class GetFlowTask implements Callable<FlowPath> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800200 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800201 private final FlowId flowId;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800202
203 GetFlowTask(DBOperation dbHandler, FlowId flowId) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800204 this.dbHandler = dbHandler;
205 this.flowId = flowId;
206 }
207 @Override
208 public FlowPath call() throws Exception{
209 return getFlow(dbHandler, flowId);
210 }
211 }
212
Brian O'Connor7fb57862014-01-10 17:13:38 -0800213 /**
214 * Add a flow by creating a database task, then waiting for the result.
215 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
216 * performance benefit.
217 *
218 * @param dbHandler the Graph Database handler to use.
219 * @param flowPath the Flow Path to install.
220 * @return true on success, otherwise false.
221 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800222 static boolean addFlow(DBOperation dbHandler, FlowPath flowPath) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800223 Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
224 // NOTE: This function is blocking
225 try {
226 return result.get();
227 } catch (InterruptedException | ExecutionException e) {
228 return false;
229 }
230 }
231
Brian O'Connor7fb57862014-01-10 17:13:38 -0800232 /**
233 * Add a flow asynchronously by creating a database task.
234 *
235 * @param dbHandler the Graph Database handler to use.
236 * @param flowPath the Flow Path to install.
237 * @param datagridService the notification service for when the task is completed
238 * @return true always
239 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800240 static boolean addFlow(DBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800241 executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800242 // TODO: If we need the results, submit returns a Future that contains
243 // the result.
244 return true;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800245
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800246 }
247
Brian O'Connor7fb57862014-01-10 17:13:38 -0800248 /**
249 * The basic parallelization unit for adding FlowPaths.
250 *
251 * This is simply a wrapper for FlowDatabaseOperation.addFlow(),
252 * which also sends a notification if a datagrid services is provided
253 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800254 private final static class AddFlowTask implements Callable<Boolean> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800255 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800256 private final FlowPath flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800257 private final IDatagridService datagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800258
259 AddFlowTask(DBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800260 FlowPath flowPath,
261 IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800262 this.dbHandler = dbHandler;
Pavlin Radoslavov94aed682014-01-14 21:10:04 -0800263
264 // Create a copy of the FlowPath object
265 Kryo kryo = kryoFactory.newKryo();
266 this.flowPath = kryo.copy(flowPath);
267 kryoFactory.deleteKryo(kryo);
268
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800269 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800270 }
271
272 @Override
273 public Boolean call() throws Exception {
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800274// String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
275 String tag1 = "FlowDatabaseOperation.AddFlow";
276// String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
277 String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry";
278 PerformanceMonitor.Measurement m;
279 m = PerformanceMonitor.start(tag1);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800280 boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
brian6df88232014-01-14 17:05:07 -0800281// PerformanceMonitor.stop(tag1);
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800282 m.stop();
283 m = PerformanceMonitor.start(tag2);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800284 if(success) {
285 if(datagridService != null) {
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -0800286 // Send notifications for each Flow Entry
287 for (FlowEntry flowEntry : flowPath.flowEntries()) {
288 if (flowEntry.flowEntrySwitchState() !=
289 FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
290 continue;
291 }
292 //
293 // Write the Flow Entry to the Datagrid
294 //
295 switch (flowEntry.flowEntryUserState()) {
296 case FE_USER_ADD:
297 datagridService.notificationSendFlowEntryAdded(flowEntry);
298 break;
299 case FE_USER_MODIFY:
300 datagridService.notificationSendFlowEntryUpdated(flowEntry);
301 break;
302 case FE_USER_DELETE:
303 datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
304 break;
305 case FE_USER_UNKNOWN:
306 assert(false);
307 break;
308 }
309 }
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800310 }
311 }
312 else {
313 log.error("Error adding flow path {} to database", flowPath);
314 }
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800315 m.stop();
316// PerformanceMonitor.report(tag1);
317// PerformanceMonitor.report(tag2);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800318 return success;
319
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800320 }
321 }
322
Brian O'Connor7fb57862014-01-10 17:13:38 -0800323 /**
324 * Delete a previously added flow by creating a database task, then waiting
325 * for the result.
326 *
327 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
328 * performance benefit.
329 *
330 * @param dbHandler the Graph Database handler to use.
331 * @param flowId the Flow ID of the flow to delete.
332 * @return true on success, otherwise false.
333 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800334 static boolean deleteFlow(DBOperation dbHandler, FlowId flowId) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800335 Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
336 // NOTE: This function is blocking
337 try {
338 return result.get();
339 } catch (InterruptedException | ExecutionException e) {
340 return false;
341 }
342 }
343
Brian O'Connor7fb57862014-01-10 17:13:38 -0800344 /**
345 * Delete a previously added flow asynchronously by creating a database task.
346 *
347 * @param dbHandler the Graph Database handler to use.
348 * @param flowId the Flow ID of the flow to delete.
349 * @param datagridService the notification service for when the task is completed
350 * @return true always
351 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800352 static boolean deleteFlow(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800353 executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800354 // TODO: If we need the results, submit returns a Future that contains
355 // the result.
356 return true;
357 }
358
Brian O'Connor7fb57862014-01-10 17:13:38 -0800359 /**
360 * The basic parallelization unit for deleting FlowPaths.
361 *
362 * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
363 * which also sends a notification if a datagrid services is provided
364 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800365 private final static class DeleteFlowTask implements Callable<Boolean> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800366 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800367 private final FlowId flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800368 private final IDatagridService datagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800369
370 DeleteFlowTask(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800371 this.dbHandler = dbHandler;
Pavlin Radoslavov94aed682014-01-14 21:10:04 -0800372
373 // Create a copy of the FlowId object
374 Kryo kryo = kryoFactory.newKryo();
375 this.flowId = kryo.copy(flowId);
376 kryoFactory.deleteKryo(kryo);
377
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800378 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800379 }
380 @Override
381 public Boolean call() throws Exception{
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800382 boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
383 if(success) {
384 if(datagridService != null) {
385 datagridService.notificationSendFlowIdRemoved(flowId);
386 }
387 }
388 else {
389 log.error("Error removing flow path {} from database", flowId);
390 }
391 return success;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800392 }
393 }
394}