blob: 212c59f73b906e8d01167ebae0ccbfa209774eb9 [file] [log] [blame]
Brian O'Connor0ca538c2014-01-10 15:42:34 -08001package net.onrc.onos.ofcontroller.flowmanager;
2
3import java.util.ArrayList;
4import java.util.Collection;
Brian O'Connorc1e3c452014-01-10 16:13:42 -08005import java.util.HashSet;
6import java.util.Set;
Brian O'Connor0ca538c2014-01-10 15:42:34 -08007import java.util.concurrent.Callable;
8import java.util.concurrent.CompletionService;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ExecutorCompletionService;
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080013import java.util.concurrent.Future;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080014
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
17
Brian O'Connorc1e3c452014-01-10 16:13:42 -080018import net.onrc.onos.datagrid.IDatagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -080019import net.onrc.onos.graph.DBOperation;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080020import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080021import net.onrc.onos.ofcontroller.util.Dpid;
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -080022import net.onrc.onos.ofcontroller.util.FlowEntry;
23import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080024import net.onrc.onos.ofcontroller.util.FlowId;
25import net.onrc.onos.ofcontroller.util.FlowPath;
26
Brian O'Connor7fb57862014-01-10 17:13:38 -080027/**
28 * Class for performing parallel Flow-related operations on the Database.
29 *
30 * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
31 * for parallelization.
32 *
33 * @author Brian O'Connor <brian@onlab.us>
34 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -080035public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
36 private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
37
Brian O'Connor8e49d202014-01-10 17:17:18 -080038 private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
Brian O'Connor0ca538c2014-01-10 15:42:34 -080039 private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
40
Brian O'Connor7fb57862014-01-10 17:13:38 -080041 /**
42 * Get all installed flows by first querying the database for all FlowPaths
43 * and then populating them from the database in parallel.
44 *
45 * @param dbHandler the Graph Database handler to use.
46 * @return the Flow Paths if found, otherwise an empty list.
47 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -080048 static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -080049 Iterable<IFlowPath> flowPathsObj = null;
50 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
51
52 try {
53 flowPathsObj = dbHandler.getAllFlowPaths();
54 } catch (Exception e) {
55 // TODO: handle exceptions
56 dbHandler.rollback();
57 log.error(":getAllFlowPaths failed");
58 return flowPaths;
59 }
60 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
61 dbHandler.commit();
62 return flowPaths; // No Flows found
63 }
64
65 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
66 int numTasks = 0;
67 for(IFlowPath flowObj : flowPathsObj) {
68 tasks.submit(new ExtractFlowTask(flowObj));
69 numTasks++;
70 }
71 for(int i = 0; i < numTasks; i++) {
72 try {
73 FlowPath flowPath = tasks.take().get();
74 if(flowPath != null) {
75 flowPaths.add(flowPath);
76 }
77 } catch (InterruptedException | ExecutionException e) {
78 log.error("Error reading FlowPath from IFlowPath object");
79 }
80 }
81 dbHandler.commit();
82 return flowPaths;
83 }
84
Brian O'Connor7fb57862014-01-10 17:13:38 -080085 /**
86 * Query the database for all flow paths that have their source switch
87 * in the provided collection
Brian O'Connorc1e3c452014-01-10 16:13:42 -080088 *
89 * Note: this function is implemented naively and inefficiently
Brian O'Connor7fb57862014-01-10 17:13:38 -080090 *
91 * @param dbHandler the Graph Database handler to use.
92 * @param switches a collection of switches whose flow paths you want
93 * @return the Flow Paths if found, otherwise an empty list.
Brian O'Connorc1e3c452014-01-10 16:13:42 -080094 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -080095 static ArrayList<FlowPath> getFlowsForSwitches(DBOperation dbHandler, Collection<Dpid> switches) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -080096 Iterable<IFlowPath> flowPathsObj = null;
97 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
98
99 try {
100 flowPathsObj = dbHandler.getAllFlowPaths();
101 } catch (Exception e) {
102 // TODO: handle exceptions
103 dbHandler.rollback();
104 log.error(":getAllFlowPaths failed");
105 return flowPaths;
106 }
107 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
108 dbHandler.commit();
109 return flowPaths; // No Flows found
110 }
111
112 // convert the collection of switch dpids into a set of strings
113 Set<String> switchSet = new HashSet<>();
114 for(Dpid dpid : switches) {
115 switchSet.add(dpid.toString());
116 }
117
118 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
119 int numTasks = 0;
120 for(IFlowPath flowObj : flowPathsObj) {
121 if(switchSet.contains(flowObj.getSrcSwitch())) {
122 tasks.submit(new ExtractFlowTask(flowObj));
123 numTasks++;
124 }
125 }
126 for(int i = 0; i < numTasks; i++) {
127 try {
128 FlowPath flowPath = tasks.take().get();
129 if(flowPath != null) {
130 flowPaths.add(flowPath);
131 }
132 } catch (InterruptedException | ExecutionException e) {
133 log.error("Error reading FlowPath from IFlowPath object");
134 }
135 }
136 dbHandler.commit();
137 return flowPaths;
138 }
139
Brian O'Connor7fb57862014-01-10 17:13:38 -0800140 /**
141 * The basic parallelization unit for extracting FlowEntries from the database.
142 *
143 * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
144 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800145 private final static class ExtractFlowTask implements Callable<FlowPath> {
146 private final IFlowPath flowObj;
147
148 ExtractFlowTask(IFlowPath flowObj){
149 this.flowObj = flowObj;
150 }
151 @Override
152 public FlowPath call() throws Exception {
153 return extractFlowPath(flowObj);
154 }
155 }
156
Brian O'Connor7fb57862014-01-10 17:13:38 -0800157 /**
158 * Get a subset of installed flows in parallel.
159 *
160 * @param dbHandler the Graph Database handler to use.
161 * @param flowIds the collection of Flow IDs to get.
162 * @return the Flow Paths if found, otherwise an empty list.
163 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800164 static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800165 Collection<FlowId> flowIds) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800166 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
167
168 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
169 int numTasks = 0;
170 for (FlowId flowId : flowIds) {
171 tasks.submit(new GetFlowTask(dbHandler, flowId));
172 numTasks++;
173 }
174 for(int i = 0; i < numTasks; i++) {
175 try {
176 FlowPath flowPath = tasks.take().get();
177 if(flowPath != null) {
178 flowPaths.add(flowPath);
179 }
180 } catch (InterruptedException | ExecutionException e) {
181 log.error("Error reading FlowPath from database");
182 }
183 }
184 // TODO: should we commit?
185 //dbHandler.commit();
186 return flowPaths;
187 }
188
Brian O'Connor7fb57862014-01-10 17:13:38 -0800189 /**
190 * The basic parallelization unit for getting FlowEntries.
191 *
192 * This is simply a wrapper for FlowDatabaseOperation.getFlow()
193 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800194 private final static class GetFlowTask implements Callable<FlowPath> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800195 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800196 private final FlowId flowId;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800197
198 GetFlowTask(DBOperation dbHandler, FlowId flowId) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800199 this.dbHandler = dbHandler;
200 this.flowId = flowId;
201 }
202 @Override
203 public FlowPath call() throws Exception{
204 return getFlow(dbHandler, flowId);
205 }
206 }
207
Brian O'Connor7fb57862014-01-10 17:13:38 -0800208 /**
209 * Add a flow by creating a database task, then waiting for the result.
210 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
211 * performance benefit.
212 *
213 * @param dbHandler the Graph Database handler to use.
214 * @param flowPath the Flow Path to install.
215 * @return true on success, otherwise false.
216 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800217 static boolean addFlow(DBOperation dbHandler, FlowPath flowPath) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800218 Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
219 // NOTE: This function is blocking
220 try {
221 return result.get();
222 } catch (InterruptedException | ExecutionException e) {
223 return false;
224 }
225 }
226
Brian O'Connor7fb57862014-01-10 17:13:38 -0800227 /**
228 * Add a flow asynchronously by creating a database task.
229 *
230 * @param dbHandler the Graph Database handler to use.
231 * @param flowPath the Flow Path to install.
232 * @param datagridService the notification service for when the task is completed
233 * @return true always
234 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800235 static boolean addFlow(DBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800236 executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800237 // TODO: If we need the results, submit returns a Future that contains
238 // the result.
239 return true;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800240
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800241 }
242
Brian O'Connor7fb57862014-01-10 17:13:38 -0800243 /**
244 * The basic parallelization unit for adding FlowPaths.
245 *
246 * This is simply a wrapper for FlowDatabaseOperation.addFlow(),
247 * which also sends a notification if a datagrid services is provided
248 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800249 private final static class AddFlowTask implements Callable<Boolean> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800250 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800251 private final FlowPath flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800252 private final IDatagridService datagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800253
254 AddFlowTask(DBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800255 FlowPath flowPath,
256 IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800257 this.dbHandler = dbHandler;
258 this.flowPath = flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800259 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800260 }
261
262 @Override
263 public Boolean call() throws Exception {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800264 boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
265 if(success) {
266 if(datagridService != null) {
Pavlin Radoslavovbb8148a2014-01-10 17:21:12 -0800267 // Send notifications for each Flow Entry
268 for (FlowEntry flowEntry : flowPath.flowEntries()) {
269 if (flowEntry.flowEntrySwitchState() !=
270 FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
271 continue;
272 }
273 //
274 // Write the Flow Entry to the Datagrid
275 //
276 switch (flowEntry.flowEntryUserState()) {
277 case FE_USER_ADD:
278 datagridService.notificationSendFlowEntryAdded(flowEntry);
279 break;
280 case FE_USER_MODIFY:
281 datagridService.notificationSendFlowEntryUpdated(flowEntry);
282 break;
283 case FE_USER_DELETE:
284 datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
285 break;
286 case FE_USER_UNKNOWN:
287 assert(false);
288 break;
289 }
290 }
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800291 }
292 }
293 else {
294 log.error("Error adding flow path {} to database", flowPath);
295 }
296 return success;
297
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800298 }
299 }
300
Brian O'Connor7fb57862014-01-10 17:13:38 -0800301 /**
302 * Delete a previously added flow by creating a database task, then waiting
303 * for the result.
304 *
305 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
306 * performance benefit.
307 *
308 * @param dbHandler the Graph Database handler to use.
309 * @param flowId the Flow ID of the flow to delete.
310 * @return true on success, otherwise false.
311 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800312 static boolean deleteFlow(DBOperation dbHandler, FlowId flowId) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800313 Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
314 // NOTE: This function is blocking
315 try {
316 return result.get();
317 } catch (InterruptedException | ExecutionException e) {
318 return false;
319 }
320 }
321
Brian O'Connor7fb57862014-01-10 17:13:38 -0800322 /**
323 * Delete a previously added flow asynchronously by creating a database task.
324 *
325 * @param dbHandler the Graph Database handler to use.
326 * @param flowId the Flow ID of the flow to delete.
327 * @param datagridService the notification service for when the task is completed
328 * @return true always
329 */
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800330 static boolean deleteFlow(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800331 executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800332 // TODO: If we need the results, submit returns a Future that contains
333 // the result.
334 return true;
335 }
336
Brian O'Connor7fb57862014-01-10 17:13:38 -0800337 /**
338 * The basic parallelization unit for deleting FlowPaths.
339 *
340 * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
341 * which also sends a notification if a datagrid services is provided
342 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800343 private final static class DeleteFlowTask implements Callable<Boolean> {
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800344 private final DBOperation dbHandler;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800345 private final FlowId flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800346 private final IDatagridService datagridService;
Yuta HIGUCHI337e46d2014-01-10 22:49:27 -0800347
348 DeleteFlowTask(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800349 this.dbHandler = dbHandler;
350 this.flowId = flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800351 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800352 }
353 @Override
354 public Boolean call() throws Exception{
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800355 boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
356 if(success) {
357 if(datagridService != null) {
358 datagridService.notificationSendFlowIdRemoved(flowId);
359 }
360 }
361 else {
362 log.error("Error removing flow path {} from database", flowId);
363 }
364 return success;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800365 }
366 }
367}