blob: f80d0147fc295146993416f423aac15099861430 [file] [log] [blame]
Brian O'Connor0ca538c2014-01-10 15:42:34 -08001package net.onrc.onos.ofcontroller.flowmanager;
2
3import java.util.ArrayList;
4import java.util.Collection;
Brian O'Connorc1e3c452014-01-10 16:13:42 -08005import java.util.HashSet;
6import java.util.Set;
Brian O'Connor0ca538c2014-01-10 15:42:34 -08007import java.util.concurrent.Callable;
8import java.util.concurrent.CompletionService;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ExecutorCompletionService;
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080013import java.util.concurrent.Future;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080014
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
17
Brian O'Connorc1e3c452014-01-10 16:13:42 -080018import net.onrc.onos.datagrid.IDatagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080019import net.onrc.onos.graph.GraphDBOperation;
20import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -080021import net.onrc.onos.ofcontroller.util.Dpid;
Brian O'Connor0ca538c2014-01-10 15:42:34 -080022import net.onrc.onos.ofcontroller.util.FlowId;
23import net.onrc.onos.ofcontroller.util.FlowPath;
24
Brian O'Connor7fb57862014-01-10 17:13:38 -080025/**
26 * Class for performing parallel Flow-related operations on the Database.
27 *
28 * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
29 * for parallelization.
30 *
31 * @author Brian O'Connor <brian@onlab.us>
32 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -080033public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
34 private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
35
Brian O'Connor8e49d202014-01-10 17:17:18 -080036 private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
Brian O'Connor0ca538c2014-01-10 15:42:34 -080037 private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
38
Brian O'Connor7fb57862014-01-10 17:13:38 -080039 /**
40 * Get all installed flows by first querying the database for all FlowPaths
41 * and then populating them from the database in parallel.
42 *
43 * @param dbHandler the Graph Database handler to use.
44 * @return the Flow Paths if found, otherwise an empty list.
45 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -080046 static ArrayList<FlowPath> getAllFlows(GraphDBOperation dbHandler) {
47 Iterable<IFlowPath> flowPathsObj = null;
48 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
49
50 try {
51 flowPathsObj = dbHandler.getAllFlowPaths();
52 } catch (Exception e) {
53 // TODO: handle exceptions
54 dbHandler.rollback();
55 log.error(":getAllFlowPaths failed");
56 return flowPaths;
57 }
58 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
59 dbHandler.commit();
60 return flowPaths; // No Flows found
61 }
62
63 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
64 int numTasks = 0;
65 for(IFlowPath flowObj : flowPathsObj) {
66 tasks.submit(new ExtractFlowTask(flowObj));
67 numTasks++;
68 }
69 for(int i = 0; i < numTasks; i++) {
70 try {
71 FlowPath flowPath = tasks.take().get();
72 if(flowPath != null) {
73 flowPaths.add(flowPath);
74 }
75 } catch (InterruptedException | ExecutionException e) {
76 log.error("Error reading FlowPath from IFlowPath object");
77 }
78 }
79 dbHandler.commit();
80 return flowPaths;
81 }
82
Brian O'Connor7fb57862014-01-10 17:13:38 -080083 /**
84 * Query the database for all flow paths that have their source switch
85 * in the provided collection
Brian O'Connorc1e3c452014-01-10 16:13:42 -080086 *
87 * Note: this function is implemented naively and inefficiently
Brian O'Connor7fb57862014-01-10 17:13:38 -080088 *
89 * @param dbHandler the Graph Database handler to use.
90 * @param switches a collection of switches whose flow paths you want
91 * @return the Flow Paths if found, otherwise an empty list.
Brian O'Connorc1e3c452014-01-10 16:13:42 -080092 */
93 static ArrayList<FlowPath> getFlowsForSwitches(GraphDBOperation dbHandler, Collection<Dpid> switches) {
94 Iterable<IFlowPath> flowPathsObj = null;
95 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
96
97 try {
98 flowPathsObj = dbHandler.getAllFlowPaths();
99 } catch (Exception e) {
100 // TODO: handle exceptions
101 dbHandler.rollback();
102 log.error(":getAllFlowPaths failed");
103 return flowPaths;
104 }
105 if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
106 dbHandler.commit();
107 return flowPaths; // No Flows found
108 }
109
110 // convert the collection of switch dpids into a set of strings
111 Set<String> switchSet = new HashSet<>();
112 for(Dpid dpid : switches) {
113 switchSet.add(dpid.toString());
114 }
115
116 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
117 int numTasks = 0;
118 for(IFlowPath flowObj : flowPathsObj) {
119 if(switchSet.contains(flowObj.getSrcSwitch())) {
120 tasks.submit(new ExtractFlowTask(flowObj));
121 numTasks++;
122 }
123 }
124 for(int i = 0; i < numTasks; i++) {
125 try {
126 FlowPath flowPath = tasks.take().get();
127 if(flowPath != null) {
128 flowPaths.add(flowPath);
129 }
130 } catch (InterruptedException | ExecutionException e) {
131 log.error("Error reading FlowPath from IFlowPath object");
132 }
133 }
134 dbHandler.commit();
135 return flowPaths;
136 }
137
Brian O'Connor7fb57862014-01-10 17:13:38 -0800138 /**
139 * The basic parallelization unit for extracting FlowEntries from the database.
140 *
141 * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
142 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800143 private final static class ExtractFlowTask implements Callable<FlowPath> {
144 private final IFlowPath flowObj;
145
146 ExtractFlowTask(IFlowPath flowObj){
147 this.flowObj = flowObj;
148 }
149 @Override
150 public FlowPath call() throws Exception {
151 return extractFlowPath(flowObj);
152 }
153 }
154
Brian O'Connor7fb57862014-01-10 17:13:38 -0800155 /**
156 * Get a subset of installed flows in parallel.
157 *
158 * @param dbHandler the Graph Database handler to use.
159 * @param flowIds the collection of Flow IDs to get.
160 * @return the Flow Paths if found, otherwise an empty list.
161 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800162 static ArrayList<FlowPath> getFlows(GraphDBOperation dbHandler,
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800163 Collection<FlowId> flowIds) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800164 ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
165
166 CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
167 int numTasks = 0;
168 for (FlowId flowId : flowIds) {
169 tasks.submit(new GetFlowTask(dbHandler, flowId));
170 numTasks++;
171 }
172 for(int i = 0; i < numTasks; i++) {
173 try {
174 FlowPath flowPath = tasks.take().get();
175 if(flowPath != null) {
176 flowPaths.add(flowPath);
177 }
178 } catch (InterruptedException | ExecutionException e) {
179 log.error("Error reading FlowPath from database");
180 }
181 }
182 // TODO: should we commit?
183 //dbHandler.commit();
184 return flowPaths;
185 }
186
Brian O'Connor7fb57862014-01-10 17:13:38 -0800187 /**
188 * The basic parallelization unit for getting FlowEntries.
189 *
190 * This is simply a wrapper for FlowDatabaseOperation.getFlow()
191 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800192 private final static class GetFlowTask implements Callable<FlowPath> {
193 private final GraphDBOperation dbHandler;
194 private final FlowId flowId;
195
196 GetFlowTask(GraphDBOperation dbHandler, FlowId flowId) {
197 this.dbHandler = dbHandler;
198 this.flowId = flowId;
199 }
200 @Override
201 public FlowPath call() throws Exception{
202 return getFlow(dbHandler, flowId);
203 }
204 }
205
Brian O'Connor7fb57862014-01-10 17:13:38 -0800206 /**
207 * Add a flow by creating a database task, then waiting for the result.
208 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
209 * performance benefit.
210 *
211 * @param dbHandler the Graph Database handler to use.
212 * @param flowPath the Flow Path to install.
213 * @return true on success, otherwise false.
214 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800215 static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800216 Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
217 // NOTE: This function is blocking
218 try {
219 return result.get();
220 } catch (InterruptedException | ExecutionException e) {
221 return false;
222 }
223 }
224
Brian O'Connor7fb57862014-01-10 17:13:38 -0800225 /**
226 * Add a flow asynchronously by creating a database task.
227 *
228 * @param dbHandler the Graph Database handler to use.
229 * @param flowPath the Flow Path to install.
230 * @param datagridService the notification service for when the task is completed
231 * @return true always
232 */
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800233 static boolean addFlow(GraphDBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
234 executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800235 // TODO: If we need the results, submit returns a Future that contains
236 // the result.
237 return true;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800238
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800239 }
240
Brian O'Connor7fb57862014-01-10 17:13:38 -0800241 /**
242 * The basic parallelization unit for adding FlowPaths.
243 *
244 * This is simply a wrapper for FlowDatabaseOperation.addFlow(),
245 * which also sends a notification if a datagrid services is provided
246 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800247 private final static class AddFlowTask implements Callable<Boolean> {
248 private final GraphDBOperation dbHandler;
249 private final FlowPath flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800250 private final IDatagridService datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800251
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800252 AddFlowTask(GraphDBOperation dbHandler,
253 FlowPath flowPath,
254 IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800255 this.dbHandler = dbHandler;
256 this.flowPath = flowPath;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800257 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800258 }
259
260 @Override
261 public Boolean call() throws Exception {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800262 boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
263 if(success) {
264 if(datagridService != null) {
265 datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
266 flowPath.dataPath().srcPort().dpid());
267 }
268 }
269 else {
270 log.error("Error adding flow path {} to database", flowPath);
271 }
272 return success;
273
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800274 }
275 }
276
Brian O'Connor7fb57862014-01-10 17:13:38 -0800277 /**
278 * Delete a previously added flow by creating a database task, then waiting
279 * for the result.
280 *
281 * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
282 * performance benefit.
283 *
284 * @param dbHandler the Graph Database handler to use.
285 * @param flowId the Flow ID of the flow to delete.
286 * @return true on success, otherwise false.
287 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800288 static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800289 Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
290 // NOTE: This function is blocking
291 try {
292 return result.get();
293 } catch (InterruptedException | ExecutionException e) {
294 return false;
295 }
296 }
297
Brian O'Connor7fb57862014-01-10 17:13:38 -0800298 /**
299 * Delete a previously added flow asynchronously by creating a database task.
300 *
301 * @param dbHandler the Graph Database handler to use.
302 * @param flowId the Flow ID of the flow to delete.
303 * @param datagridService the notification service for when the task is completed
304 * @return true always
305 */
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800306 static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
307 executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800308 // TODO: If we need the results, submit returns a Future that contains
309 // the result.
310 return true;
311 }
312
Brian O'Connor7fb57862014-01-10 17:13:38 -0800313 /**
314 * The basic parallelization unit for deleting FlowPaths.
315 *
316 * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
317 * which also sends a notification if a datagrid services is provided
318 */
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800319 private final static class DeleteFlowTask implements Callable<Boolean> {
320 private final GraphDBOperation dbHandler;
321 private final FlowId flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800322 private final IDatagridService datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800323
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800324 DeleteFlowTask(GraphDBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800325 this.dbHandler = dbHandler;
326 this.flowId = flowId;
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800327 this.datagridService = datagridService;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800328 }
329 @Override
330 public Boolean call() throws Exception{
Brian O'Connorc1e3c452014-01-10 16:13:42 -0800331 boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
332 if(success) {
333 if(datagridService != null) {
334 datagridService.notificationSendFlowIdRemoved(flowId);
335 }
336 }
337 else {
338 log.error("Error removing flow path {} from database", flowId);
339 }
340 return success;
Brian O'Connor0ca538c2014-01-10 15:42:34 -0800341 }
342 }
343}