blob: a897cbb132eaacb1a44498040b6c126abaa70916 [file] [log] [blame]
tombe988312014-09-19 18:38:47 -07001package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -07002
alshabibbb42cad2014-09-25 11:43:05 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.Iterator;
7import java.util.List;
alshabib193525b2014-10-08 18:58:03 -07008import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -07009import java.util.concurrent.ExecutionException;
10import java.util.concurrent.Future;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070013import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070014
alshabib57044ba2014-09-16 15:58:01 -070015import org.apache.felix.scr.annotations.Activate;
16import org.apache.felix.scr.annotations.Component;
17import org.apache.felix.scr.annotations.Deactivate;
18import org.apache.felix.scr.annotations.Reference;
19import org.apache.felix.scr.annotations.ReferenceCardinality;
20import org.apache.felix.scr.annotations.Service;
alshabiba68eb962014-09-24 20:34:13 -070021import org.onlab.onos.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070022import org.onlab.onos.event.AbstractListenerRegistry;
23import org.onlab.onos.event.EventDeliveryService;
24import org.onlab.onos.net.Device;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070027import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070028import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070029import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070030import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070031import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib902d41b2014-10-07 16:52:05 -070032import org.onlab.onos.net.flow.FlowRuleBatchOperation;
alshabib57044ba2014-09-16 15:58:01 -070033import org.onlab.onos.net.flow.FlowRuleEvent;
34import org.onlab.onos.net.flow.FlowRuleListener;
35import org.onlab.onos.net.flow.FlowRuleProvider;
36import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
37import org.onlab.onos.net.flow.FlowRuleProviderService;
38import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070039import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070040import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070041import org.onlab.onos.net.provider.AbstractProviderRegistry;
42import org.onlab.onos.net.provider.AbstractProviderService;
43import org.slf4j.Logger;
44
alshabib902d41b2014-10-07 16:52:05 -070045import com.google.common.collect.ArrayListMultimap;
alshabibbb42cad2014-09-25 11:43:05 -070046import com.google.common.collect.Lists;
alshabib902d41b2014-10-07 16:52:05 -070047import com.google.common.collect.Multimap;
alshabiba7f7ca82014-09-22 11:41:23 -070048
tome4729872014-09-23 00:37:37 -070049/**
50 * Provides implementation of the flow NB & SB APIs.
51 */
alshabib57044ba2014-09-16 15:58:01 -070052@Component(immediate = true)
53@Service
tom202175a2014-09-19 19:00:11 -070054public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070055 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
56 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070057
alshabib193525b2014-10-08 18:58:03 -070058 enum BatchState { STARTED, FINISHED, CANCELLED };
59
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070060 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070061 private final Logger log = getLogger(getClass());
62
63 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070064 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070065
alshabibbb42cad2014-09-25 11:43:05 -070066 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070067
tombe988312014-09-19 18:38:47 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070070
alshabib57044ba2014-09-16 15:58:01 -070071 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070072 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070073
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070075 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -070076
77 @Activate
78 public void activate() {
tomc78acee2014-09-24 15:16:55 -070079 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070080 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
81 log.info("Started");
82 }
83
84 @Deactivate
85 public void deactivate() {
tomc78acee2014-09-24 15:16:55 -070086 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070087 eventDispatcher.removeSink(FlowRuleEvent.class);
88 log.info("Stopped");
89 }
90
91 @Override
tom9b4030d2014-10-06 10:39:03 -070092 public int getFlowRuleCount() {
93 return store.getFlowRuleCount();
94 }
95
96 @Override
alshabib1c319ff2014-10-04 20:29:09 -070097 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070098 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -070099 }
100
101 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700102 public void applyFlowRules(FlowRule... flowRules) {
alshabib57044ba2014-09-16 15:58:01 -0700103 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700104 FlowRule f = flowRules[i];
alshabib57044ba2014-09-16 15:58:01 -0700105 final Device device = deviceService.getDevice(f.deviceId());
106 final FlowRuleProvider frp = getProvider(device.providerId());
alshabib219ebaa2014-09-22 15:41:24 -0700107 store.storeFlowRule(f);
alshabib57044ba2014-09-16 15:58:01 -0700108 frp.applyFlowRule(f);
109 }
alshabib57044ba2014-09-16 15:58:01 -0700110 }
111
112 @Override
113 public void removeFlowRules(FlowRule... flowRules) {
alshabibbb8b1282014-09-22 17:00:18 -0700114 FlowRule f;
alshabiba68eb962014-09-24 20:34:13 -0700115 FlowRuleProvider frp;
116 Device device;
alshabib57044ba2014-09-16 15:58:01 -0700117 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700118 f = flowRules[i];
119 device = deviceService.getDevice(f.deviceId());
alshabib219ebaa2014-09-22 15:41:24 -0700120 store.deleteFlowRule(f);
tom7951b232014-10-06 13:35:30 -0700121 if (device != null) {
122 frp = getProvider(device.providerId());
123 frp.removeFlowRule(f);
124 }
alshabib57044ba2014-09-16 15:58:01 -0700125 }
alshabiba68eb962014-09-24 20:34:13 -0700126 }
alshabib57044ba2014-09-16 15:58:01 -0700127
alshabiba68eb962014-09-24 20:34:13 -0700128 @Override
129 public void removeFlowRulesById(ApplicationId id) {
tom9b4030d2014-10-06 10:39:03 -0700130 Iterable<FlowRule> rules = getFlowRulesById(id);
alshabiba68eb962014-09-24 20:34:13 -0700131 FlowRuleProvider frp;
132 Device device;
alshabibbb42cad2014-09-25 11:43:05 -0700133
alshabiba68eb962014-09-24 20:34:13 -0700134 for (FlowRule f : rules) {
135 store.deleteFlowRule(f);
136 device = deviceService.getDevice(f.deviceId());
137 frp = getProvider(device.providerId());
138 frp.removeRulesById(id, f);
139 }
140 }
141
142 @Override
143 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
alshabib1c319ff2014-10-04 20:29:09 -0700144 return store.getFlowRulesByAppId(id);
alshabib57044ba2014-09-16 15:58:01 -0700145 }
146
147 @Override
alshabib902d41b2014-10-07 16:52:05 -0700148 public Future<CompletedBatchOperation> applyBatch(
149 FlowRuleBatchOperation batch) {
150 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
151 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700152 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700153 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
154 final FlowRule f = fbe.getTarget();
155 final Device device = deviceService.getDevice(f.deviceId());
156 final FlowRuleProvider frp = getProvider(device.providerId());
157 batches.put(frp, fbe);
158 switch (fbe.getOperator()) {
159 case ADD:
160 store.storeFlowRule(f);
161 break;
162 case REMOVE:
163 store.deleteFlowRule(f);
164 break;
165 case MODIFY:
166 default:
167 log.error("Batch operation type {} unsupported.", fbe.getOperator());
168 }
169 }
170 for (FlowRuleProvider provider : batches.keySet()) {
171 FlowRuleBatchOperation b =
172 new FlowRuleBatchOperation(batches.get(provider));
alshabib193525b2014-10-08 18:58:03 -0700173 Future<CompletedBatchOperation> future = provider.executeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700174 futures.add(future);
175 }
alshabib193525b2014-10-08 18:58:03 -0700176 return new FlowRuleBatchFuture(futures, batches);
alshabib902d41b2014-10-07 16:52:05 -0700177 }
178
179 @Override
alshabib57044ba2014-09-16 15:58:01 -0700180 public void addListener(FlowRuleListener listener) {
181 listenerRegistry.addListener(listener);
182 }
183
184 @Override
185 public void removeListener(FlowRuleListener listener) {
186 listenerRegistry.removeListener(listener);
187 }
188
189 @Override
190 protected FlowRuleProviderService createProviderService(
191 FlowRuleProvider provider) {
192 return new InternalFlowRuleProviderService(provider);
193 }
194
195 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700196 extends AbstractProviderService<FlowRuleProvider>
197 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700198
199 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
200 super(provider);
201 }
202
203 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700204 public void flowRemoved(FlowEntry flowEntry) {
205 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700206 checkValidity();
alshabib1c319ff2014-10-04 20:29:09 -0700207 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700208 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700209 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700210 return;
211 }
alshabib1c319ff2014-10-04 20:29:09 -0700212 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700213 FlowRuleProvider frp = getProvider(device.providerId());
214 FlowRuleEvent event = null;
215 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700216 case ADDED:
217 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700218 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700219 break;
220 case PENDING_REMOVE:
221 case REMOVED:
222 event = store.removeFlowRule(stored);
223 break;
224 default:
225 break;
alshabib57044ba2014-09-16 15:58:01 -0700226
alshabiba68eb962014-09-24 20:34:13 -0700227 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700228 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700229 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700230 post(event);
231 }
alshabib57044ba2014-09-16 15:58:01 -0700232 }
233
alshabibba5ac482014-10-02 17:15:20 -0700234
alshabib1c319ff2014-10-04 20:29:09 -0700235 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700236 checkNotNull(flowRule, FLOW_RULE_NULL);
237 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700238 Device device = deviceService.getDevice(flowRule.deviceId());
239 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700240 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700241 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700242 case PENDING_REMOVE:
243 case REMOVED:
244 event = store.removeFlowRule(flowRule);
245 frp.removeFlowRule(flowRule);
246 break;
247 case ADDED:
248 case PENDING_ADD:
249 frp.applyFlowRule(flowRule);
250 break;
251 default:
252 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700253 }
254
alshabibbb42cad2014-09-25 11:43:05 -0700255 if (event != null) {
256 log.debug("Flow {} removed", flowRule);
257 post(event);
258 }
alshabib57044ba2014-09-16 15:58:01 -0700259
260 }
261
alshabibba5ac482014-10-02 17:15:20 -0700262
263 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700264 checkNotNull(flowRule, FLOW_RULE_NULL);
265 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700266 removeFlowRules(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700267 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700268 }
269
alshabibba5ac482014-10-02 17:15:20 -0700270
alshabib1c319ff2014-10-04 20:29:09 -0700271 private void flowAdded(FlowEntry flowEntry) {
272 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700273 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700274
alshabib1c319ff2014-10-04 20:29:09 -0700275 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700276
alshabib1c319ff2014-10-04 20:29:09 -0700277 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700278 if (event == null) {
279 log.debug("No flow store event generated.");
280 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700281 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700282 post(event);
283 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700284 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700285 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700286 }
alshabib219ebaa2014-09-22 15:41:24 -0700287
alshabib57044ba2014-09-16 15:58:01 -0700288 }
289
alshabib1c319ff2014-10-04 20:29:09 -0700290 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
291 if (storedRule == null) {
292 return false;
293 }
alshabib85c41972014-10-03 13:48:39 -0700294 long timeout = storedRule.timeout() * 1000;
295 Long currentTime = System.currentTimeMillis();
296 if (storedRule.packets() != swRule.packets()) {
alshabib1c319ff2014-10-04 20:29:09 -0700297 storedRule.setLastSeen();
alshabib85c41972014-10-03 13:48:39 -0700298 return true;
299 }
alshabib85c41972014-10-03 13:48:39 -0700300
alshabib1c319ff2014-10-04 20:29:09 -0700301 if ((currentTime - storedRule.lastSeen()) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700302 return true;
303 }
304 return false;
alshabibba5ac482014-10-02 17:15:20 -0700305 }
306
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700307 // Posts the specified event to the local event dispatcher.
308 private void post(FlowRuleEvent event) {
309 if (event != null) {
310 eventDispatcher.post(event);
311 }
312 }
alshabib5c370ff2014-09-18 10:12:14 -0700313
314 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700315 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
316 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700317
alshabib1c319ff2014-10-04 20:29:09 -0700318 Iterator<FlowEntry> switchRulesIterator = flowEntries.iterator();
alshabiba7f7ca82014-09-22 11:41:23 -0700319
320 while (switchRulesIterator.hasNext()) {
alshabib1c319ff2014-10-04 20:29:09 -0700321 FlowEntry rule = switchRulesIterator.next();
alshabiba7f7ca82014-09-22 11:41:23 -0700322 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700323 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700324 flowAdded(rule);
325 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700326 // the device has a rule the store does not have
327 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700328 }
329 }
alshabib1c319ff2014-10-04 20:29:09 -0700330 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700331 // there are rules in the store that aren't on the switch
332 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700333
alshabiba7f7ca82014-09-22 11:41:23 -0700334 }
alshabib5c370ff2014-09-18 10:12:14 -0700335 }
alshabib57044ba2014-09-16 15:58:01 -0700336 }
337
tomc78acee2014-09-24 15:16:55 -0700338 // Store delegate to re-post events emitted from the store.
339 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
340 @Override
341 public void notify(FlowRuleEvent event) {
342 eventDispatcher.post(event);
343 }
344 }
alshabib902d41b2014-10-07 16:52:05 -0700345
346 private class FlowRuleBatchFuture
347 implements Future<CompletedBatchOperation> {
348
alshabib193525b2014-10-08 18:58:03 -0700349 private final List<Future<CompletedBatchOperation>> futures;
350 private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
351 private final AtomicReference<BatchState> state;
352 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700353
alshabib193525b2014-10-08 18:58:03 -0700354
355
356 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
357 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700358 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700359 this.batches = batches;
360 state = new AtomicReference<FlowRuleManager.BatchState>();
361 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700362 }
363
364 @Override
365 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700366 if (state.get() == BatchState.FINISHED) {
367 return false;
368 }
369 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
370 return false;
371 }
372 cleanUpBatch();
373 for (Future<CompletedBatchOperation> f : futures) {
374 f.cancel(mayInterruptIfRunning);
375 }
376 return true;
alshabib902d41b2014-10-07 16:52:05 -0700377 }
378
379 @Override
380 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700381 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700382 }
383
384 @Override
385 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700386 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700387 }
388
alshabib193525b2014-10-08 18:58:03 -0700389
alshabib902d41b2014-10-07 16:52:05 -0700390 @Override
391 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700392 ExecutionException {
393
394 if (isDone()) {
395 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700396 }
alshabib193525b2014-10-08 18:58:03 -0700397
398 boolean success = true;
399 List<FlowEntry> failed = Lists.newLinkedList();
400 CompletedBatchOperation completed;
401 for (Future<CompletedBatchOperation> future : futures) {
402 completed = future.get();
403 success = validateBatchOperation(failed, completed, future);
404 }
405
406 return finalizeBatchOperation(success, failed);
407
alshabib902d41b2014-10-07 16:52:05 -0700408 }
409
410 @Override
411 public CompletedBatchOperation get(long timeout, TimeUnit unit)
412 throws InterruptedException, ExecutionException,
413 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700414
415 if (isDone()) {
416 return overall;
417 }
418 boolean success = true;
419 List<FlowEntry> failed = Lists.newLinkedList();
420 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700421 long start = System.nanoTime();
422 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700423
424 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700425 long now = System.nanoTime();
426 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700427 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
428 success = validateBatchOperation(failed, completed, future);
alshabib902d41b2014-10-07 16:52:05 -0700429 }
alshabib193525b2014-10-08 18:58:03 -0700430 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700431 }
432
alshabib193525b2014-10-08 18:58:03 -0700433 private boolean validateBatchOperation(List<FlowEntry> failed,
434 CompletedBatchOperation completed,
435 Future<CompletedBatchOperation> future) {
436
437 if (isCancelled()) {
438 throw new CancellationException();
439 }
440 if (!completed.isSuccess()) {
441 failed.addAll(completed.failedItems());
442 cleanUpBatch();
443 cancelAllSubBatches();
444 return false;
445 }
446 return true;
447 }
448
449 private void cancelAllSubBatches() {
450 for (Future<CompletedBatchOperation> f : futures) {
451 f.cancel(true);
452 }
453 }
454
455 private CompletedBatchOperation finalizeBatchOperation(boolean success,
456 List<FlowEntry> failed) {
457 synchronized (overall) {
458 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
459 if (state.get() == BatchState.FINISHED) {
460 return overall;
461 }
462 throw new CancellationException();
463 }
464 overall = new CompletedBatchOperation(success, failed);
465 return overall;
466 }
467 }
468
469 private void cleanUpBatch() {
470 for (FlowRuleBatchEntry fbe : batches.values()) {
471 if (fbe.getOperator() == FlowRuleOperation.ADD ||
472 fbe.getOperator() == FlowRuleOperation.MODIFY) {
473 store.deleteFlowRule(fbe.getTarget());
474 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
475 store.storeFlowRule(fbe.getTarget());
476 }
477 }
478
479 }
alshabib902d41b2014-10-07 16:52:05 -0700480 }
481
482
alshabib193525b2014-10-08 18:58:03 -0700483
484
alshabib57044ba2014-09-16 15:58:01 -0700485}