blob: b5fd95dea625133a271e738ea752ef7997c03d2f [file] [log] [blame]
Brian O'Connor0ca538c2014-01-10 15:42:34 -08001package net.onrc.onos.ofcontroller.flowmanager;
2
3import java.util.ArrayList;
4import java.util.Collection;
Brian O'Connorc1e3c452014-01-10 16:13:42 -08005import java.util.HashSet;
6import java.util.Set;
Brian O'Connor0ca538c2014-01-10 15:42:34 -08007import java.util.concurrent.Callable;
8import java.util.concurrent.CompletionService;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ExecutorCompletionService;
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080013import java.util.concurrent.Future;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080014
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
17
Brian O'Connorc1e3c452014-01-10 16:13:42 -080018import net.onrc.onos.datagrid.IDatagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -080019import net.onrc.onos.graph.DBOperation;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080020import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080021import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -080022import net.onrc.onos.ofcontroller.util.FlowEntry;
23import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080024import net.onrc.onos.ofcontroller.util.FlowId;
25import net.onrc.onos.ofcontroller.util.FlowPath;
Pavlin Radoslavov6b3b4ad2014-03-19 11:58:06 -070026import net.onrc.onos.ofcontroller.util.PerformanceMonitor;
Pavlin Radoslavov94aed682014-01-14 21:10:04 -080027import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
28
Yuta HIGUCHIcc8b1012014-01-24 09:14:39 -080029import com.esotericsoftware.kryo.Kryo;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080030
Brian O'Connor7fb57862014-01-10 17:13:38 -080031/**
32 * Class for performing parallel Flow-related operations on the Database.
33 *
34 * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
35 * for parallelization.
36 *
37 * @author Brian O'Connor <brian@onlab.us>
38 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -080039public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
40 private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
41
Brian O'Connor8e49d202014-01-10 17:17:18 -080042 private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
Brian O'Connor0ca538c2014-01-10 15:42:34 -080043 private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
44
Pavlin Radoslavov94aed682014-01-14 21:10:04 -080045 private static KryoFactory kryoFactory = new KryoFactory();
46
Brian O'Connor7fb57862014-01-10 17:13:38 -080047 /**
48 * Get all installed flows by first querying the database for all FlowPaths
49 * and then populating them from the database in parallel.
50 *
51 * @param dbHandler the Graph Database handler to use.
52 * @return the Flow Paths if found, otherwise an empty list.
53 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -080054 static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -080055 Iterable<IFlowPath> flowPathsObj = null;
56 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
57
58 try {
59 flowPathsObj = dbHandler.getAllFlowPaths();
60 } catch (Exception e) {
61 // TODO: handle exceptions
62 dbHandler.rollback();
63 log.error(":getAllFlowPaths failed");
64 return flowPaths;
65 }
66 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
67 dbHandler.commit();
68 return flowPaths; // No Flows found
69 }
70
71 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
72 int numTasks = 0;
73 for(IFlowPath flowObj : flowPathsObj) {
74 tasks.submit(new ExtractFlowTask(flowObj));
75 numTasks++;
76 }
77 for(int i = 0; i < numTasks; i++) {
78 try {
79 FlowPath flowPath = tasks.take().get();
80 if(flowPath != null) {
81 flowPaths.add(flowPath);
82 }
83 } catch (InterruptedException | ExecutionException e) {
84 log.error("Error reading FlowPath from IFlowPath object");
85 }
86 }
87 dbHandler.commit();
88 return flowPaths;
89 }
90
Brian O'Connor7fb57862014-01-10 17:13:38 -080091 /**
92 * Query the database for all flow paths that have their source switch
93 * in the provided collection
Brian O'Connorc1e3c452014-01-10 16:13:42 -080094 *
95 * Note: this function is implemented naively and inefficiently
Brian O'Connor7fb57862014-01-10 17:13:38 -080096 *
97 * @param dbHandler the Graph Database handler to use.
98 * @param switches a collection of switches whose flow paths you want
99 * @return the Flow Paths if found, otherwise an empty list.
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800100 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800101 static ArrayList<FlowPath> getFlowsForSwitches(DBOperation dbHandler, Collection<Dpid> switches) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800102 Iterable<IFlowPath> flowPathsObj = null;
103 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
104
105 try {
106 flowPathsObj = dbHandler.getAllFlowPaths();
107 } catch (Exception e) {
108 // TODO: handle exceptions
109 dbHandler.rollback();
110 log.error(":getAllFlowPaths failed");
111 return flowPaths;
112 }
113 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
114 dbHandler.commit();
115 return flowPaths; // No Flows found
116 }
117
118 // convert the collection of switch dpids into a set of strings
119 Set<String> switchSet = new HashSet<>();
120 for(Dpid dpid : switches) {
121 switchSet.add(dpid.toString());
122 }
123
124 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
125 int numTasks = 0;
126 for(IFlowPath flowObj : flowPathsObj) {
127 if(switchSet.contains(flowObj.getSrcSwitch())) {
128 tasks.submit(new ExtractFlowTask(flowObj));
129 numTasks++;
130 }
131 }
132 for(int i = 0; i < numTasks; i++) {
133 try {
134 FlowPath flowPath = tasks.take().get();
135 if(flowPath != null) {
136 flowPaths.add(flowPath);
137 }
138 } catch (InterruptedException | ExecutionException e) {
139 log.error("Error reading FlowPath from IFlowPath object");
140 }
141 }
142 dbHandler.commit();
143 return flowPaths;
144 }
145
Brian O'Connor7fb57862014-01-10 17:13:38 -0800146 /**
147 * The basic parallelization unit for extracting FlowEntries from the database.
148 *
149 * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
150 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800151 private final static class ExtractFlowTask implements Callable<FlowPath> {
152 private final IFlowPath flowObj;
153
154 ExtractFlowTask(IFlowPath flowObj){
155 this.flowObj = flowObj;
156 }
157 @Override
158 public FlowPath call() throws Exception {
159 return extractFlowPath(flowObj);
160 }
161 }
162
Brian O'Connor7fb57862014-01-10 17:13:38 -0800163 /**
164 * Get a subset of installed flows in parallel.
165 *
166 * @param dbHandler the Graph Database handler to use.
167 * @param flowIds the collection of Flow IDs to get.
168 * @return the Flow Paths if found, otherwise an empty list.
169 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800170 static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800171 Collection<FlowId> flowIds) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800172 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
173
174 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
175 int numTasks = 0;
176 for (FlowId flowId : flowIds) {
177 tasks.submit(new GetFlowTask(dbHandler, flowId));
178 numTasks++;
179 }
180 for(int i = 0; i < numTasks; i++) {
181 try {
182 FlowPath flowPath = tasks.take().get();
183 if(flowPath != null) {
184 flowPaths.add(flowPath);
185 }
186 } catch (InterruptedException | ExecutionException e) {
187 log.error("Error reading FlowPath from database");
188 }
189 }
190 // TODO: should we commit?
191 //dbHandler.commit();
192 return flowPaths;
193 }
194
Brian O'Connor7fb57862014-01-10 17:13:38 -0800195 /**
196 * The basic parallelization unit for getting FlowEntries.
197 *
198 * This is simply a wrapper for FlowDatabaseOperation.getFlow()
199 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800200 private final static class GetFlowTask implements Callable<FlowPath> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800201 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800202 private final FlowId flowId;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800203
204 GetFlowTask(DBOperation dbHandler, FlowId flowId) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800205 this.dbHandler = dbHandler;
206 this.flowId = flowId;
207 }
208 @Override
209 public FlowPath call() throws Exception{
210 return getFlow(dbHandler, flowId);
211 }
212 }
213
Brian O'Connor7fb57862014-01-10 17:13:38 -0800214 /**
215 * Add a flow by creating a database task, then waiting for the result.
216 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
217 * performance benefit.
218 *
219 * @param dbHandler the Graph Database handler to use.
220 * @param flowPath the Flow Path to install.
221 * @return true on success, otherwise false.
222 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800223 static boolean addFlow(DBOperation dbHandler, FlowPath flowPath) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800224 Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
225 // NOTE: This function is blocking
226 try {
227 return result.get();
228 } catch (InterruptedException | ExecutionException e) {
229 return false;
230 }
231 }
232
Brian O'Connor7fb57862014-01-10 17:13:38 -0800233 /**
234 * Add a flow asynchronously by creating a database task.
235 *
236 * @param dbHandler the Graph Database handler to use.
237 * @param flowPath the Flow Path to install.
238 * @param datagridService the notification service for when the task is completed
239 * @return true always
240 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800241 static boolean addFlow(DBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800242 executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800243 // TODO: If we need the results, submit returns a Future that contains
244 // the result.
245 return true;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800246
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800247 }
248
Brian O'Connor7fb57862014-01-10 17:13:38 -0800249 /**
250 * The basic parallelization unit for adding FlowPaths.
251 *
252 * This is simply a wrapper for FlowDatabaseOperation.addFlow(),
253 * which also sends a notification if a datagrid services is provided
254 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800255 private final static class AddFlowTask implements Callable<Boolean> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800256 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800257 private final FlowPath flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800258 private final IDatagridService datagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800259
260 AddFlowTask(DBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800261 FlowPath flowPath,
262 IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800263 this.dbHandler = dbHandler;
Pavlin Radoslavov94aed682014-01-14 21:10:04 -0800264
265 // Create a copy of the FlowPath object
266 Kryo kryo = kryoFactory.newKryo();
267 this.flowPath = kryo.copy(flowPath);
268 kryoFactory.deleteKryo(kryo);
269
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800270 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800271 }
272
273 @Override
274 public Boolean call() throws Exception {
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800275// String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
276 String tag1 = "FlowDatabaseOperation.AddFlow";
277// String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
278 String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry";
279 PerformanceMonitor.Measurement m;
280 m = PerformanceMonitor.start(tag1);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800281 boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
brian6df88232014-01-14 17:05:07 -0800282// PerformanceMonitor.stop(tag1);
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800283 m.stop();
284 m = PerformanceMonitor.start(tag2);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800285 if(success) {
286 if(datagridService != null) {
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -0800287 // Send notifications for each Flow Entry
288 for (FlowEntry flowEntry : flowPath.flowEntries()) {
289 if (flowEntry.flowEntrySwitchState() !=
290 FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
291 continue;
292 }
293 //
294 // Write the Flow Entry to the Datagrid
295 //
296 switch (flowEntry.flowEntryUserState()) {
297 case FE_USER_ADD:
298 datagridService.notificationSendFlowEntryAdded(flowEntry);
299 break;
300 case FE_USER_MODIFY:
301 datagridService.notificationSendFlowEntryUpdated(flowEntry);
302 break;
303 case FE_USER_DELETE:
304 datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
305 break;
306 case FE_USER_UNKNOWN:
307 assert(false);
308 break;
309 }
310 }
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800311 }
312 }
313 else {
314 log.error("Error adding flow path {} to database", flowPath);
315 }
Brian O'Connor2daf7a92014-01-14 11:26:35 -0800316 m.stop();
317// PerformanceMonitor.report(tag1);
318// PerformanceMonitor.report(tag2);
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800319 return success;
320
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800321 }
322 }
323
Brian O'Connor7fb57862014-01-10 17:13:38 -0800324 /**
325 * Delete a previously added flow by creating a database task, then waiting
326 * for the result.
327 *
328 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
329 * performance benefit.
330 *
331 * @param dbHandler the Graph Database handler to use.
332 * @param flowId the Flow ID of the flow to delete.
333 * @return true on success, otherwise false.
334 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800335 static boolean deleteFlow(DBOperation dbHandler, FlowId flowId) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800336 Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
337 // NOTE: This function is blocking
338 try {
339 return result.get();
340 } catch (InterruptedException | ExecutionException e) {
341 return false;
342 }
343 }
344
Brian O'Connor7fb57862014-01-10 17:13:38 -0800345 /**
346 * Delete a previously added flow asynchronously by creating a database task.
347 *
348 * @param dbHandler the Graph Database handler to use.
349 * @param flowId the Flow ID of the flow to delete.
350 * @param datagridService the notification service for when the task is completed
351 * @return true always
352 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800353 static boolean deleteFlow(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800354 executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800355 // TODO: If we need the results, submit returns a Future that contains
356 // the result.
357 return true;
358 }
359
Brian O'Connor7fb57862014-01-10 17:13:38 -0800360 /**
361 * The basic parallelization unit for deleting FlowPaths.
362 *
363 * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
364 * which also sends a notification if a datagrid services is provided
365 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800366 private final static class DeleteFlowTask implements Callable<Boolean> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800367 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800368 private final FlowId flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800369 private final IDatagridService datagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800370
371 DeleteFlowTask(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800372 this.dbHandler = dbHandler;
Pavlin Radoslavov94aed682014-01-14 21:10:04 -0800373
374 // Create a copy of the FlowId object
375 Kryo kryo = kryoFactory.newKryo();
376 this.flowId = kryo.copy(flowId);
377 kryoFactory.deleteKryo(kryo);
378
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800379 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800380 }
381 @Override
382 public Boolean call() throws Exception{
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800383 boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
384 if(success) {
385 if(datagridService != null) {
386 datagridService.notificationSendFlowIdRemoved(flowId);
387 }
388 }
389 else {
390 log.error("Error removing flow path {} from database", flowId);
391 }
392 return success;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800393 }
394 }
395}