blob: 5a64b1dfc8252d0a51d59df7137ed4d40921971e [file] [log] [blame]
Hongtao Yin142b7582015-01-21 14:41:30 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowext.impl;
17
18import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Multimap;
21import com.google.common.collect.Sets;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onosproject.event.AbstractListenerRegistry;
29import org.onosproject.event.EventDeliveryService;
30import org.onosproject.net.DeviceId;
31import org.onosproject.net.device.DeviceService;
32import org.onosproject.net.flow.FlowRule;
33import org.onosproject.net.flow.FlowRuleBatchEntry;
34import org.onosproject.net.flow.FlowRuleBatchEvent;
35import org.onosproject.net.flow.FlowRuleBatchRequest;
36import org.onosproject.net.flow.FlowRuleEvent;
37import org.onosproject.net.flow.FlowRuleListener;
38import org.onosproject.net.flow.FlowRuleProvider;
39import org.onosproject.net.flow.impl.FlowRuleManager;
40import org.onosproject.net.flowext.FlowExtCompletedOperation;
41import org.onosproject.net.flowext.FlowRuleExtRouter;
42import org.onosproject.net.flowext.FlowRuleExtRouterListener;
43import org.onosproject.net.flowext.FlowRuleExtService;
44import org.slf4j.Logger;
45
46import java.util.Collection;
47import java.util.Collections;
48import java.util.List;
49import java.util.Set;
50import java.util.concurrent.CancellationException;
51import java.util.concurrent.ExecutionException;
52import java.util.concurrent.ExecutorService;
53import java.util.concurrent.Executors;
54import java.util.concurrent.Future;
55import java.util.concurrent.TimeUnit;
56import java.util.concurrent.TimeoutException;
57import java.util.concurrent.atomic.AtomicReference;
58
59import static org.onlab.util.Tools.namedThreads;
60import static org.slf4j.LoggerFactory.getLogger;
61
62/**
63 * Experimental extension to the flow rule subsystem; still under development.
64 */
alshabiba66a0562015-02-17 15:50:54 -080065@Component(immediate = false)
Hongtao Yin142b7582015-01-21 14:41:30 -080066@Service
67public class FlowRuleExtManager extends FlowRuleManager
68 implements FlowRuleExtService {
69
70 enum BatchState {
71 STARTED, FINISHED, CANCELLED
72 }
73
74 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
75 private final Logger log = getLogger(getClass());
76
77 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
78 listenerRegistry = new AbstractListenerRegistry<>();
79
80 private ExecutorService futureService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected FlowRuleExtRouter router;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected EventDeliveryService eventDispatcher;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected DeviceService deviceService;
90
91 InternalFlowRuleExtRouterListener routerListener = new InternalFlowRuleExtRouterListener();
92
93 @Activate
94 public void activate() {
95 futureService = Executors.newFixedThreadPool(
96 32, namedThreads("provider-future-listeners-%d"));
97 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
98 router.addListener(routerListener);
99 log.info("Started");
100 }
101
102 @Deactivate
103 public void deactivate() {
104 futureService.shutdownNow();
105 eventDispatcher.removeSink(FlowRuleEvent.class);
106 router.removeListener(routerListener);
107 log.info("Stopped");
108 }
109
110 /**
111 * Applies a batch operation of FlowRules.
112 * this batch can be divided into many sub-batch by deviceId
113 *
114 * @param batch batch operation to apply
115 * @return future indicating the state of the batch operation
116 */
117 @Override
118 public Future<FlowExtCompletedOperation> applyBatch(FlowRuleBatchRequest batch) {
119 // TODO group the Collection into sub-Collection by deviceId
120 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap
121 .create();
122 for (FlowRuleBatchEntry fbe : batch.ops()) {
123 FlowRule flowRule = fbe.target();
124 perDeviceBatches.put(flowRule.deviceId(), fbe);
125 }
126
127 List<Future<FlowExtCompletedOperation>> futures = Lists.newArrayList();
128 for (DeviceId deviceId : perDeviceBatches.keySet()) {
129 Collection<FlowRuleBatchEntry> flows = perDeviceBatches.get(deviceId);
130 //FIXME if there can be collisions, than converting the collection to a set will drop flow rules
131 FlowRuleBatchRequest subBatch = new FlowRuleBatchRequest(batch.batchId(), Sets.newHashSet(flows));
132 Future<FlowExtCompletedOperation> future = router.applySubBatch(subBatch);
133 futures.add(future);
134 }
135 return new FlowRuleBatchFuture(batch.batchId(), futures);
136 }
137
138 /**
139 * Batch futures include all flow extension entries in one batch.
140 * Using for transaction and will use in next-step.
141 */
142 private class FlowRuleBatchFuture
143 implements Future<FlowExtCompletedOperation> {
144
145 private final List<Future<FlowExtCompletedOperation>> futures;
146 private final long batchId;
147 private final AtomicReference<BatchState> state;
148 private FlowExtCompletedOperation overall;
149
150 public FlowRuleBatchFuture(long batchId, List<Future<FlowExtCompletedOperation>> futures) {
151 this.futures = futures;
152 this.batchId = batchId;
153 state = new AtomicReference<FlowRuleExtManager.BatchState>();
154 state.set(BatchState.STARTED);
155 }
156
157 /**
158 * Attempts to cancel execution of this task.
159 *
160 * @param mayInterruptIfRunning {@code true} if the thread executing this
161 * task should be interrupted; otherwise, in-progress tasks are allowed
162 * to complete
163 * @return {@code false} if the task could not be cancelled,
164 * typically because it has already completed normally;
165 * {@code true} otherwise
166 */
167 @Override
168 public boolean cancel(boolean mayInterruptIfRunning) {
169 if (state.get() == BatchState.FINISHED) {
170 return false;
171 }
172 if (log.isDebugEnabled()) {
173 log.debug("Cancelling FlowRuleBatchFuture",
174 new RuntimeException("Just printing backtrace"));
175 }
176 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
177 return false;
178 }
179 cleanUpBatch();
180 for (Future<FlowExtCompletedOperation> f : futures) {
181 f.cancel(mayInterruptIfRunning);
182 }
183 return true;
184 }
185
186 /**
187 * Judge whether the task cancelled completely.
188 *
189 * @return {@code true} if this task was cancelled before it completed
190 */
191 @Override
192 public boolean isCancelled() {
193 return state.get() == BatchState.CANCELLED;
194 }
195
196 /**
197 * Judge whether the task finished completely.
198 *
199 * @return {@code true} if this task completed
200 */
201 @Override
202 public boolean isDone() {
203 return state.get() == BatchState.FINISHED;
204 }
205
206 /**
207 * Get the result of apply flow extension rules.
208 * If the task isn't finished, the thread block here.
209 */
210 @Override
211 public FlowExtCompletedOperation get()
212 throws InterruptedException, ExecutionException {
213
214 if (isDone()) {
215 return overall;
216 }
217 boolean success = true;
218 Set<FlowRule> failed = Sets.newHashSet();
219 FlowExtCompletedOperation completed;
220 for (Future<FlowExtCompletedOperation> future : futures) {
221 completed = future.get();
222 success = validateBatchOperation(failed, completed);
223 }
224 return finalizeBatchOperation(success, failed);
225 }
226
227 /**
228 * Waits if necessary for at most the given time for the computation
229 * to complete, and then retrieves its result, if available. In here,
230 * the maximum of time out is sum of given time for every computation.
231 *
232 * @param timeout the maximum time to wait
233 * @param unit the time unit of the timeout argument
234 * @return the computed result
235 * @throws CancellationException if the computation was cancelled
236 * @throws ExecutionException if the computation threw an
237 * exception
238 * @throws InterruptedException if the current thread was interrupted
239 * while waiting
240 * @throws TimeoutException if the wait timed out
241 */
242 @Override
243 public FlowExtCompletedOperation get(long timeout, TimeUnit unit)
244 throws InterruptedException, ExecutionException,
245 TimeoutException {
246
247 if (isDone()) {
248 return overall;
249 }
250 boolean success = true;
251 Set<FlowRule> failed = Sets.newHashSet();
252 FlowExtCompletedOperation completed;
253 for (Future<FlowExtCompletedOperation> future : futures) {
254 completed = future.get(timeout, unit);
255 success = validateBatchOperation(failed, completed);
256 }
257 return finalizeBatchOperation(success, failed);
258 }
259
260 /**
261 * Confirm whether the batch operation success.
262 *
263 * @param failed using to populate failed entries
264 * @param completed the result of apply flow extension entries
265 * @return {@code true} if all entries applies successful
266 */
267 private boolean validateBatchOperation(Set<FlowRule> failed,
268 FlowExtCompletedOperation completed) {
269
270 if (isCancelled()) {
271 throw new CancellationException();
272 }
273 if (!completed.isSuccess()) {
274 log.warn("FlowRuleBatch failed: {}", completed);
275 failed.addAll(completed.failedItems());
276 cleanUpBatch();
277 cancelAllSubBatches();
278 return false;
279 }
280 return true;
281 }
282
283 /**
284 * Once one subBatch failed, cancel the rest of them.
285 */
286 private void cancelAllSubBatches() {
287 for (Future<FlowExtCompletedOperation> f : futures) {
288 f.cancel(true);
289 }
290 }
291
292 /**
293 * Construct the result of batch operation.
294 *
295 * @param success the result of batch operation
296 * @param failed the failed entries of batch operation
297 * @return FlowExtCompletedOperation of batch operation
298 */
299 private FlowExtCompletedOperation finalizeBatchOperation(boolean success,
300 Set<FlowRule> failed) {
301 synchronized (this) {
302 if (!state.compareAndSet(BatchState.STARTED,
303 BatchState.FINISHED)) {
304 if (state.get() == BatchState.FINISHED) {
305 return overall;
306 }
307 throw new CancellationException();
308 }
309 overall = new FlowExtCompletedOperation(batchId, success, failed);
310 return overall;
311 }
312 }
313
314 private void cleanUpBatch() {
315 }
316 }
317
318 /**
319 * South Bound API to south plug-in.
320 */
321 private class InternalFlowRuleExtRouterListener
322 implements FlowRuleExtRouterListener {
323 @Override
324 public void notify(FlowRuleBatchEvent event) {
325 // Request has been forwarded to MASTER Node
326 for (FlowRuleBatchEntry entry : event.subject().ops()) {
327 switch (entry.operator()) {
328 case ADD:
329 eventDispatcher
330 .post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED,
331 entry.target()));
332 break;
333 // FALLTHROUGH
334 case REMOVE:
335 case MODIFY:
336 default:
337 // TODO not implemented
338 break;
339 }
340 }
341 // send it
342 FlowRuleProvider flowRuleProvider = getProvider(event.subject().ops()
343 .iterator().next().target().deviceId());
344 // TODO we may want to specify a deviceId
345 flowRuleProvider.executeBatch(event.subject().asBatchOperation(null));
346 // do not have transaction, assume it install success
347 // temporarily
348 FlowExtCompletedOperation result = new FlowExtCompletedOperation(
349 event.subject().batchId(), true, Collections.emptySet());
350 futureService.submit(() -> {
351 router.batchOperationComplete(FlowRuleBatchEvent
352 .completed(event.subject(), result));
353 });
354 }
355 }
356}