blob: fa8e947f5510f2f281a04b82475b3e03bc2d5642 [file] [log] [blame]
tombe988312014-09-19 18:38:47 -07001package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -07002
alshabibbb42cad2014-09-25 11:43:05 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.Iterator;
7import java.util.List;
alshabib193525b2014-10-08 18:58:03 -07008import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -07009import java.util.concurrent.ExecutionException;
10import java.util.concurrent.Future;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070013import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070014
alshabib57044ba2014-09-16 15:58:01 -070015import org.apache.felix.scr.annotations.Activate;
16import org.apache.felix.scr.annotations.Component;
17import org.apache.felix.scr.annotations.Deactivate;
18import org.apache.felix.scr.annotations.Reference;
19import org.apache.felix.scr.annotations.ReferenceCardinality;
20import org.apache.felix.scr.annotations.Service;
alshabiba68eb962014-09-24 20:34:13 -070021import org.onlab.onos.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070022import org.onlab.onos.event.AbstractListenerRegistry;
23import org.onlab.onos.event.EventDeliveryService;
24import org.onlab.onos.net.Device;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070027import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070028import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070029import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070030import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070031import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070032import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib902d41b2014-10-07 16:52:05 -070033import org.onlab.onos.net.flow.FlowRuleBatchOperation;
alshabib57044ba2014-09-16 15:58:01 -070034import org.onlab.onos.net.flow.FlowRuleEvent;
35import org.onlab.onos.net.flow.FlowRuleListener;
36import org.onlab.onos.net.flow.FlowRuleProvider;
37import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
38import org.onlab.onos.net.flow.FlowRuleProviderService;
39import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070040import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070041import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070042import org.onlab.onos.net.provider.AbstractProviderRegistry;
43import org.onlab.onos.net.provider.AbstractProviderService;
44import org.slf4j.Logger;
45
alshabib902d41b2014-10-07 16:52:05 -070046import com.google.common.collect.ArrayListMultimap;
alshabibbb42cad2014-09-25 11:43:05 -070047import com.google.common.collect.Lists;
alshabib902d41b2014-10-07 16:52:05 -070048import com.google.common.collect.Multimap;
alshabiba7f7ca82014-09-22 11:41:23 -070049
tome4729872014-09-23 00:37:37 -070050/**
51 * Provides implementation of the flow NB & SB APIs.
52 */
alshabib57044ba2014-09-16 15:58:01 -070053@Component(immediate = true)
54@Service
tom202175a2014-09-19 19:00:11 -070055public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070056 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
57 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070058
alshabib193525b2014-10-08 18:58:03 -070059 enum BatchState { STARTED, FINISHED, CANCELLED };
60
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070061 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070062 private final Logger log = getLogger(getClass());
63
64 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070065 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070066
alshabibbb42cad2014-09-25 11:43:05 -070067 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070068
tombe988312014-09-19 18:38:47 -070069 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070071
alshabib57044ba2014-09-16 15:58:01 -070072 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070073 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070074
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070076 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -070077
78 @Activate
79 public void activate() {
tomc78acee2014-09-24 15:16:55 -070080 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070081 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
82 log.info("Started");
83 }
84
85 @Deactivate
86 public void deactivate() {
tomc78acee2014-09-24 15:16:55 -070087 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070088 eventDispatcher.removeSink(FlowRuleEvent.class);
89 log.info("Stopped");
90 }
91
92 @Override
tom9b4030d2014-10-06 10:39:03 -070093 public int getFlowRuleCount() {
94 return store.getFlowRuleCount();
95 }
96
97 @Override
alshabib1c319ff2014-10-04 20:29:09 -070098 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070099 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700100 }
101
102 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700103 public void applyFlowRules(FlowRule... flowRules) {
alshabib57044ba2014-09-16 15:58:01 -0700104 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700105 FlowRule f = flowRules[i];
alshabib57044ba2014-09-16 15:58:01 -0700106 final Device device = deviceService.getDevice(f.deviceId());
107 final FlowRuleProvider frp = getProvider(device.providerId());
alshabib219ebaa2014-09-22 15:41:24 -0700108 store.storeFlowRule(f);
alshabib57044ba2014-09-16 15:58:01 -0700109 frp.applyFlowRule(f);
110 }
alshabib57044ba2014-09-16 15:58:01 -0700111 }
112
113 @Override
114 public void removeFlowRules(FlowRule... flowRules) {
alshabibbb8b1282014-09-22 17:00:18 -0700115 FlowRule f;
alshabiba68eb962014-09-24 20:34:13 -0700116 FlowRuleProvider frp;
117 Device device;
alshabib57044ba2014-09-16 15:58:01 -0700118 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700119 f = flowRules[i];
120 device = deviceService.getDevice(f.deviceId());
alshabib219ebaa2014-09-22 15:41:24 -0700121 store.deleteFlowRule(f);
tom7951b232014-10-06 13:35:30 -0700122 if (device != null) {
123 frp = getProvider(device.providerId());
124 frp.removeFlowRule(f);
125 }
alshabib57044ba2014-09-16 15:58:01 -0700126 }
alshabiba68eb962014-09-24 20:34:13 -0700127 }
alshabib57044ba2014-09-16 15:58:01 -0700128
alshabiba68eb962014-09-24 20:34:13 -0700129 @Override
130 public void removeFlowRulesById(ApplicationId id) {
tom9b4030d2014-10-06 10:39:03 -0700131 Iterable<FlowRule> rules = getFlowRulesById(id);
alshabiba68eb962014-09-24 20:34:13 -0700132 FlowRuleProvider frp;
133 Device device;
alshabibbb42cad2014-09-25 11:43:05 -0700134
alshabiba68eb962014-09-24 20:34:13 -0700135 for (FlowRule f : rules) {
136 store.deleteFlowRule(f);
137 device = deviceService.getDevice(f.deviceId());
138 frp = getProvider(device.providerId());
139 frp.removeRulesById(id, f);
140 }
141 }
142
143 @Override
144 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
alshabib1c319ff2014-10-04 20:29:09 -0700145 return store.getFlowRulesByAppId(id);
alshabib57044ba2014-09-16 15:58:01 -0700146 }
147
148 @Override
alshabib902d41b2014-10-07 16:52:05 -0700149 public Future<CompletedBatchOperation> applyBatch(
150 FlowRuleBatchOperation batch) {
151 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
152 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700153 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700154 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
155 final FlowRule f = fbe.getTarget();
156 final Device device = deviceService.getDevice(f.deviceId());
157 final FlowRuleProvider frp = getProvider(device.providerId());
158 batches.put(frp, fbe);
159 switch (fbe.getOperator()) {
160 case ADD:
161 store.storeFlowRule(f);
162 break;
163 case REMOVE:
164 store.deleteFlowRule(f);
165 break;
166 case MODIFY:
167 default:
168 log.error("Batch operation type {} unsupported.", fbe.getOperator());
169 }
170 }
171 for (FlowRuleProvider provider : batches.keySet()) {
172 FlowRuleBatchOperation b =
173 new FlowRuleBatchOperation(batches.get(provider));
alshabib193525b2014-10-08 18:58:03 -0700174 Future<CompletedBatchOperation> future = provider.executeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700175 futures.add(future);
176 }
alshabib193525b2014-10-08 18:58:03 -0700177 return new FlowRuleBatchFuture(futures, batches);
alshabib902d41b2014-10-07 16:52:05 -0700178 }
179
180 @Override
alshabib57044ba2014-09-16 15:58:01 -0700181 public void addListener(FlowRuleListener listener) {
182 listenerRegistry.addListener(listener);
183 }
184
185 @Override
186 public void removeListener(FlowRuleListener listener) {
187 listenerRegistry.removeListener(listener);
188 }
189
190 @Override
191 protected FlowRuleProviderService createProviderService(
192 FlowRuleProvider provider) {
193 return new InternalFlowRuleProviderService(provider);
194 }
195
196 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700197 extends AbstractProviderService<FlowRuleProvider>
198 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700199
200 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
201 super(provider);
202 }
203
204 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700205 public void flowRemoved(FlowEntry flowEntry) {
206 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700207 checkValidity();
alshabib1c319ff2014-10-04 20:29:09 -0700208 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700209 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700210 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700211 return;
212 }
alshabib1c319ff2014-10-04 20:29:09 -0700213 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700214 FlowRuleProvider frp = getProvider(device.providerId());
215 FlowRuleEvent event = null;
216 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700217 case ADDED:
218 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700219 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700220 break;
221 case PENDING_REMOVE:
222 case REMOVED:
223 event = store.removeFlowRule(stored);
224 break;
225 default:
226 break;
alshabib57044ba2014-09-16 15:58:01 -0700227
alshabiba68eb962014-09-24 20:34:13 -0700228 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700229 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700230 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700231 post(event);
232 }
alshabib57044ba2014-09-16 15:58:01 -0700233 }
234
alshabibba5ac482014-10-02 17:15:20 -0700235
alshabib1c319ff2014-10-04 20:29:09 -0700236 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700237 checkNotNull(flowRule, FLOW_RULE_NULL);
238 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700239 Device device = deviceService.getDevice(flowRule.deviceId());
240 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700241 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700242 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700243 case PENDING_REMOVE:
244 case REMOVED:
245 event = store.removeFlowRule(flowRule);
246 frp.removeFlowRule(flowRule);
247 break;
248 case ADDED:
249 case PENDING_ADD:
250 frp.applyFlowRule(flowRule);
251 break;
252 default:
253 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700254 }
255
alshabibbb42cad2014-09-25 11:43:05 -0700256 if (event != null) {
257 log.debug("Flow {} removed", flowRule);
258 post(event);
259 }
alshabib57044ba2014-09-16 15:58:01 -0700260
261 }
262
alshabibba5ac482014-10-02 17:15:20 -0700263
264 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700265 checkNotNull(flowRule, FLOW_RULE_NULL);
266 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700267 removeFlowRules(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700268 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700269 }
270
alshabibba5ac482014-10-02 17:15:20 -0700271
alshabib1c319ff2014-10-04 20:29:09 -0700272 private void flowAdded(FlowEntry flowEntry) {
273 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700274 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700275
alshabib1c319ff2014-10-04 20:29:09 -0700276 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700277
alshabib1c319ff2014-10-04 20:29:09 -0700278 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700279 if (event == null) {
280 log.debug("No flow store event generated.");
281 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700282 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700283 post(event);
284 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700285 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700286 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700287 }
alshabib219ebaa2014-09-22 15:41:24 -0700288
alshabib57044ba2014-09-16 15:58:01 -0700289 }
290
alshabib1c319ff2014-10-04 20:29:09 -0700291 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
292 if (storedRule == null) {
293 return false;
294 }
alshabib85c41972014-10-03 13:48:39 -0700295 long timeout = storedRule.timeout() * 1000;
296 Long currentTime = System.currentTimeMillis();
297 if (storedRule.packets() != swRule.packets()) {
alshabib1c319ff2014-10-04 20:29:09 -0700298 storedRule.setLastSeen();
alshabib85c41972014-10-03 13:48:39 -0700299 return true;
300 }
alshabib85c41972014-10-03 13:48:39 -0700301
alshabib1c319ff2014-10-04 20:29:09 -0700302 if ((currentTime - storedRule.lastSeen()) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700303 return true;
304 }
305 return false;
alshabibba5ac482014-10-02 17:15:20 -0700306 }
307
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700308 // Posts the specified event to the local event dispatcher.
309 private void post(FlowRuleEvent event) {
310 if (event != null) {
311 eventDispatcher.post(event);
312 }
313 }
alshabib5c370ff2014-09-18 10:12:14 -0700314
315 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700316 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
317 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700318
alshabib1c319ff2014-10-04 20:29:09 -0700319 Iterator<FlowEntry> switchRulesIterator = flowEntries.iterator();
alshabiba7f7ca82014-09-22 11:41:23 -0700320
321 while (switchRulesIterator.hasNext()) {
alshabib1c319ff2014-10-04 20:29:09 -0700322 FlowEntry rule = switchRulesIterator.next();
alshabiba7f7ca82014-09-22 11:41:23 -0700323 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700324 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700325 flowAdded(rule);
326 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700327 // the device has a rule the store does not have
328 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700329 }
330 }
alshabib1c319ff2014-10-04 20:29:09 -0700331 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700332 // there are rules in the store that aren't on the switch
333 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700334
alshabiba7f7ca82014-09-22 11:41:23 -0700335 }
alshabib5c370ff2014-09-18 10:12:14 -0700336 }
alshabib57044ba2014-09-16 15:58:01 -0700337 }
338
tomc78acee2014-09-24 15:16:55 -0700339 // Store delegate to re-post events emitted from the store.
340 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
341 @Override
342 public void notify(FlowRuleEvent event) {
343 eventDispatcher.post(event);
344 }
345 }
alshabib902d41b2014-10-07 16:52:05 -0700346
347 private class FlowRuleBatchFuture
348 implements Future<CompletedBatchOperation> {
349
alshabib193525b2014-10-08 18:58:03 -0700350 private final List<Future<CompletedBatchOperation>> futures;
351 private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
352 private final AtomicReference<BatchState> state;
353 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700354
alshabib193525b2014-10-08 18:58:03 -0700355
356
357 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
358 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700359 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700360 this.batches = batches;
361 state = new AtomicReference<FlowRuleManager.BatchState>();
362 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700363 }
364
365 @Override
366 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700367 if (state.get() == BatchState.FINISHED) {
368 return false;
369 }
370 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
371 return false;
372 }
373 cleanUpBatch();
374 for (Future<CompletedBatchOperation> f : futures) {
375 f.cancel(mayInterruptIfRunning);
376 }
377 return true;
alshabib902d41b2014-10-07 16:52:05 -0700378 }
379
380 @Override
381 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700382 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700383 }
384
385 @Override
386 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700387 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700388 }
389
alshabib193525b2014-10-08 18:58:03 -0700390
alshabib902d41b2014-10-07 16:52:05 -0700391 @Override
392 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700393 ExecutionException {
394
395 if (isDone()) {
396 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700397 }
alshabib193525b2014-10-08 18:58:03 -0700398
399 boolean success = true;
400 List<FlowEntry> failed = Lists.newLinkedList();
401 CompletedBatchOperation completed;
402 for (Future<CompletedBatchOperation> future : futures) {
403 completed = future.get();
404 success = validateBatchOperation(failed, completed, future);
405 }
406
407 return finalizeBatchOperation(success, failed);
408
alshabib902d41b2014-10-07 16:52:05 -0700409 }
410
411 @Override
412 public CompletedBatchOperation get(long timeout, TimeUnit unit)
413 throws InterruptedException, ExecutionException,
414 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700415
416 if (isDone()) {
417 return overall;
418 }
419 boolean success = true;
420 List<FlowEntry> failed = Lists.newLinkedList();
421 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700422 long start = System.nanoTime();
423 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700424
425 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700426 long now = System.nanoTime();
427 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700428 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
429 success = validateBatchOperation(failed, completed, future);
alshabib902d41b2014-10-07 16:52:05 -0700430 }
alshabib193525b2014-10-08 18:58:03 -0700431 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700432 }
433
alshabib193525b2014-10-08 18:58:03 -0700434 private boolean validateBatchOperation(List<FlowEntry> failed,
435 CompletedBatchOperation completed,
436 Future<CompletedBatchOperation> future) {
437
438 if (isCancelled()) {
439 throw new CancellationException();
440 }
441 if (!completed.isSuccess()) {
442 failed.addAll(completed.failedItems());
443 cleanUpBatch();
444 cancelAllSubBatches();
445 return false;
446 }
447 return true;
448 }
449
450 private void cancelAllSubBatches() {
451 for (Future<CompletedBatchOperation> f : futures) {
452 f.cancel(true);
453 }
454 }
455
456 private CompletedBatchOperation finalizeBatchOperation(boolean success,
457 List<FlowEntry> failed) {
alshabib26834582014-10-08 20:15:46 -0700458 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700459 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
460 if (state.get() == BatchState.FINISHED) {
461 return overall;
462 }
463 throw new CancellationException();
464 }
465 overall = new CompletedBatchOperation(success, failed);
466 return overall;
467 }
468 }
469
470 private void cleanUpBatch() {
471 for (FlowRuleBatchEntry fbe : batches.values()) {
472 if (fbe.getOperator() == FlowRuleOperation.ADD ||
473 fbe.getOperator() == FlowRuleOperation.MODIFY) {
474 store.deleteFlowRule(fbe.getTarget());
475 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700476 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700477 store.storeFlowRule(fbe.getTarget());
478 }
479 }
480
481 }
alshabib902d41b2014-10-07 16:52:05 -0700482 }
483
484
alshabib193525b2014-10-08 18:58:03 -0700485
486
alshabib57044ba2014-09-16 15:58:01 -0700487}