blob: 9ea99c375f216f79dda45db4d30fa79c52b5dfc7 [file] [log] [blame]
tombe988312014-09-19 18:38:47 -07001package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -07002
alshabibbb42cad2014-09-25 11:43:05 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
alshabibbb42cad2014-09-25 11:43:05 -07006import java.util.List;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -07007import java.util.Map;
alshabib193525b2014-10-08 18:58:03 -07008import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -07009import java.util.concurrent.ExecutionException;
10import java.util.concurrent.Future;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070013import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070014
alshabib57044ba2014-09-16 15:58:01 -070015import org.apache.felix.scr.annotations.Activate;
16import org.apache.felix.scr.annotations.Component;
17import org.apache.felix.scr.annotations.Deactivate;
18import org.apache.felix.scr.annotations.Reference;
19import org.apache.felix.scr.annotations.ReferenceCardinality;
20import org.apache.felix.scr.annotations.Service;
alshabiba68eb962014-09-24 20:34:13 -070021import org.onlab.onos.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070022import org.onlab.onos.event.AbstractListenerRegistry;
23import org.onlab.onos.event.EventDeliveryService;
24import org.onlab.onos.net.Device;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070027import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070028import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070029import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070030import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070031import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070032import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib902d41b2014-10-07 16:52:05 -070033import org.onlab.onos.net.flow.FlowRuleBatchOperation;
alshabib57044ba2014-09-16 15:58:01 -070034import org.onlab.onos.net.flow.FlowRuleEvent;
35import org.onlab.onos.net.flow.FlowRuleListener;
36import org.onlab.onos.net.flow.FlowRuleProvider;
37import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
38import org.onlab.onos.net.flow.FlowRuleProviderService;
39import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070040import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070041import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070042import org.onlab.onos.net.provider.AbstractProviderRegistry;
43import org.onlab.onos.net.provider.AbstractProviderService;
44import org.slf4j.Logger;
45
alshabib902d41b2014-10-07 16:52:05 -070046import com.google.common.collect.ArrayListMultimap;
alshabibbb42cad2014-09-25 11:43:05 -070047import com.google.common.collect.Lists;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070048import com.google.common.collect.Maps;
alshabib902d41b2014-10-07 16:52:05 -070049import com.google.common.collect.Multimap;
alshabiba7f7ca82014-09-22 11:41:23 -070050
tome4729872014-09-23 00:37:37 -070051/**
52 * Provides implementation of the flow NB & SB APIs.
53 */
alshabib57044ba2014-09-16 15:58:01 -070054@Component(immediate = true)
55@Service
tom202175a2014-09-19 19:00:11 -070056public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070057 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
58 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070059
alshabib193525b2014-10-08 18:58:03 -070060 enum BatchState { STARTED, FINISHED, CANCELLED };
61
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070062 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070063 private final Logger log = getLogger(getClass());
64
65 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070066 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070067
alshabibbb42cad2014-09-25 11:43:05 -070068 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070069
tombe988312014-09-19 18:38:47 -070070 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070072
alshabib57044ba2014-09-16 15:58:01 -070073 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070074 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070075
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070077 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -070078
79 @Activate
80 public void activate() {
tomc78acee2014-09-24 15:16:55 -070081 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070082 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
83 log.info("Started");
84 }
85
86 @Deactivate
87 public void deactivate() {
tomc78acee2014-09-24 15:16:55 -070088 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070089 eventDispatcher.removeSink(FlowRuleEvent.class);
90 log.info("Stopped");
91 }
92
93 @Override
tom9b4030d2014-10-06 10:39:03 -070094 public int getFlowRuleCount() {
95 return store.getFlowRuleCount();
96 }
97
98 @Override
alshabib1c319ff2014-10-04 20:29:09 -070099 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700100 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700101 }
102
103 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700104 public void applyFlowRules(FlowRule... flowRules) {
alshabib57044ba2014-09-16 15:58:01 -0700105 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700106 FlowRule f = flowRules[i];
alshabib57044ba2014-09-16 15:58:01 -0700107 final Device device = deviceService.getDevice(f.deviceId());
108 final FlowRuleProvider frp = getProvider(device.providerId());
alshabib219ebaa2014-09-22 15:41:24 -0700109 store.storeFlowRule(f);
alshabib57044ba2014-09-16 15:58:01 -0700110 frp.applyFlowRule(f);
111 }
alshabib57044ba2014-09-16 15:58:01 -0700112 }
113
114 @Override
115 public void removeFlowRules(FlowRule... flowRules) {
alshabibbb8b1282014-09-22 17:00:18 -0700116 FlowRule f;
alshabiba68eb962014-09-24 20:34:13 -0700117 FlowRuleProvider frp;
118 Device device;
alshabib57044ba2014-09-16 15:58:01 -0700119 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700120 f = flowRules[i];
121 device = deviceService.getDevice(f.deviceId());
alshabib219ebaa2014-09-22 15:41:24 -0700122 store.deleteFlowRule(f);
tom7951b232014-10-06 13:35:30 -0700123 if (device != null) {
124 frp = getProvider(device.providerId());
125 frp.removeFlowRule(f);
126 }
alshabib57044ba2014-09-16 15:58:01 -0700127 }
alshabiba68eb962014-09-24 20:34:13 -0700128 }
alshabib57044ba2014-09-16 15:58:01 -0700129
alshabiba68eb962014-09-24 20:34:13 -0700130 @Override
131 public void removeFlowRulesById(ApplicationId id) {
tom9b4030d2014-10-06 10:39:03 -0700132 Iterable<FlowRule> rules = getFlowRulesById(id);
alshabiba68eb962014-09-24 20:34:13 -0700133 FlowRuleProvider frp;
134 Device device;
alshabibbb42cad2014-09-25 11:43:05 -0700135
alshabiba68eb962014-09-24 20:34:13 -0700136 for (FlowRule f : rules) {
137 store.deleteFlowRule(f);
138 device = deviceService.getDevice(f.deviceId());
139 frp = getProvider(device.providerId());
140 frp.removeRulesById(id, f);
141 }
142 }
143
144 @Override
145 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
alshabib1c319ff2014-10-04 20:29:09 -0700146 return store.getFlowRulesByAppId(id);
alshabib57044ba2014-09-16 15:58:01 -0700147 }
148
149 @Override
alshabib902d41b2014-10-07 16:52:05 -0700150 public Future<CompletedBatchOperation> applyBatch(
151 FlowRuleBatchOperation batch) {
152 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
153 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700154 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700155 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
156 final FlowRule f = fbe.getTarget();
157 final Device device = deviceService.getDevice(f.deviceId());
158 final FlowRuleProvider frp = getProvider(device.providerId());
159 batches.put(frp, fbe);
160 switch (fbe.getOperator()) {
161 case ADD:
162 store.storeFlowRule(f);
163 break;
164 case REMOVE:
165 store.deleteFlowRule(f);
166 break;
167 case MODIFY:
168 default:
169 log.error("Batch operation type {} unsupported.", fbe.getOperator());
170 }
171 }
172 for (FlowRuleProvider provider : batches.keySet()) {
173 FlowRuleBatchOperation b =
174 new FlowRuleBatchOperation(batches.get(provider));
alshabib193525b2014-10-08 18:58:03 -0700175 Future<CompletedBatchOperation> future = provider.executeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700176 futures.add(future);
177 }
alshabib193525b2014-10-08 18:58:03 -0700178 return new FlowRuleBatchFuture(futures, batches);
alshabib902d41b2014-10-07 16:52:05 -0700179 }
180
181 @Override
alshabib57044ba2014-09-16 15:58:01 -0700182 public void addListener(FlowRuleListener listener) {
183 listenerRegistry.addListener(listener);
184 }
185
186 @Override
187 public void removeListener(FlowRuleListener listener) {
188 listenerRegistry.removeListener(listener);
189 }
190
191 @Override
192 protected FlowRuleProviderService createProviderService(
193 FlowRuleProvider provider) {
194 return new InternalFlowRuleProviderService(provider);
195 }
196
197 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700198 extends AbstractProviderService<FlowRuleProvider>
199 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700200
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700201 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
202
alshabib57044ba2014-09-16 15:58:01 -0700203 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
204 super(provider);
205 }
206
207 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700208 public void flowRemoved(FlowEntry flowEntry) {
209 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700210 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700211 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700212 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700213 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700214 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700215 return;
216 }
alshabib1c319ff2014-10-04 20:29:09 -0700217 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700218 FlowRuleProvider frp = getProvider(device.providerId());
219 FlowRuleEvent event = null;
220 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700221 case ADDED:
222 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700223 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700224 break;
225 case PENDING_REMOVE:
226 case REMOVED:
227 event = store.removeFlowRule(stored);
228 break;
229 default:
230 break;
alshabib57044ba2014-09-16 15:58:01 -0700231
alshabiba68eb962014-09-24 20:34:13 -0700232 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700233 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700234 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700235 post(event);
236 }
alshabib57044ba2014-09-16 15:58:01 -0700237 }
238
alshabibba5ac482014-10-02 17:15:20 -0700239
alshabib1c319ff2014-10-04 20:29:09 -0700240 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700241 checkNotNull(flowRule, FLOW_RULE_NULL);
242 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700243 Device device = deviceService.getDevice(flowRule.deviceId());
244 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700245 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700246 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700247 case PENDING_REMOVE:
248 case REMOVED:
249 event = store.removeFlowRule(flowRule);
250 frp.removeFlowRule(flowRule);
251 break;
252 case ADDED:
253 case PENDING_ADD:
254 frp.applyFlowRule(flowRule);
255 break;
256 default:
257 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700258 }
259
alshabibbb42cad2014-09-25 11:43:05 -0700260 if (event != null) {
261 log.debug("Flow {} removed", flowRule);
262 post(event);
263 }
alshabib57044ba2014-09-16 15:58:01 -0700264
265 }
266
alshabibba5ac482014-10-02 17:15:20 -0700267
268 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700269 checkNotNull(flowRule, FLOW_RULE_NULL);
270 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700271 removeFlowRules(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700272 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700273 }
274
alshabibba5ac482014-10-02 17:15:20 -0700275
alshabib1c319ff2014-10-04 20:29:09 -0700276 private void flowAdded(FlowEntry flowEntry) {
277 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700278 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700279
alshabib1c319ff2014-10-04 20:29:09 -0700280 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700281
alshabib1c319ff2014-10-04 20:29:09 -0700282 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700283 if (event == null) {
284 log.debug("No flow store event generated.");
285 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700286 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700287 post(event);
288 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700289 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700290 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700291 }
alshabib219ebaa2014-09-22 15:41:24 -0700292
alshabib57044ba2014-09-16 15:58:01 -0700293 }
294
alshabib1c319ff2014-10-04 20:29:09 -0700295 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
296 if (storedRule == null) {
297 return false;
298 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700299 final long timeout = storedRule.timeout() * 1000;
300 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700301 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700302 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700303 return true;
304 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700305 if (!lastSeen.containsKey(storedRule)) {
306 // checking for the first time
307 lastSeen.put(storedRule, storedRule.lastSeen());
308 // Use following if lastSeen attr. was removed.
309 //lastSeen.put(storedRule, currentTime);
310 }
311 Long last = lastSeen.get(storedRule);
312 if (last == null) {
313 // concurrently removed? let the liveness check fail
314 return false;
315 }
alshabib85c41972014-10-03 13:48:39 -0700316
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700317 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700318 return true;
319 }
320 return false;
alshabibba5ac482014-10-02 17:15:20 -0700321 }
322
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700323 // Posts the specified event to the local event dispatcher.
324 private void post(FlowRuleEvent event) {
325 if (event != null) {
326 eventDispatcher.post(event);
327 }
328 }
alshabib5c370ff2014-09-18 10:12:14 -0700329
330 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700331 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
332 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700333
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700334 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700335 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700336 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700337 flowAdded(rule);
338 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700339 // the device has a rule the store does not have
340 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700341 }
342 }
alshabib1c319ff2014-10-04 20:29:09 -0700343 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700344 // there are rules in the store that aren't on the switch
345 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700346
alshabiba7f7ca82014-09-22 11:41:23 -0700347 }
alshabib5c370ff2014-09-18 10:12:14 -0700348 }
alshabib57044ba2014-09-16 15:58:01 -0700349 }
350
tomc78acee2014-09-24 15:16:55 -0700351 // Store delegate to re-post events emitted from the store.
352 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
353 @Override
354 public void notify(FlowRuleEvent event) {
355 eventDispatcher.post(event);
356 }
357 }
alshabib902d41b2014-10-07 16:52:05 -0700358
359 private class FlowRuleBatchFuture
360 implements Future<CompletedBatchOperation> {
361
alshabib193525b2014-10-08 18:58:03 -0700362 private final List<Future<CompletedBatchOperation>> futures;
363 private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
364 private final AtomicReference<BatchState> state;
365 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700366
alshabib193525b2014-10-08 18:58:03 -0700367
368
369 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
370 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700371 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700372 this.batches = batches;
373 state = new AtomicReference<FlowRuleManager.BatchState>();
374 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700375 }
376
377 @Override
378 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700379 if (state.get() == BatchState.FINISHED) {
380 return false;
381 }
382 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
383 return false;
384 }
385 cleanUpBatch();
386 for (Future<CompletedBatchOperation> f : futures) {
387 f.cancel(mayInterruptIfRunning);
388 }
389 return true;
alshabib902d41b2014-10-07 16:52:05 -0700390 }
391
392 @Override
393 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700394 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700395 }
396
397 @Override
398 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700399 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700400 }
401
alshabib193525b2014-10-08 18:58:03 -0700402
alshabib902d41b2014-10-07 16:52:05 -0700403 @Override
404 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700405 ExecutionException {
406
407 if (isDone()) {
408 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700409 }
alshabib193525b2014-10-08 18:58:03 -0700410
411 boolean success = true;
412 List<FlowEntry> failed = Lists.newLinkedList();
413 CompletedBatchOperation completed;
414 for (Future<CompletedBatchOperation> future : futures) {
415 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700416 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700417 }
418
419 return finalizeBatchOperation(success, failed);
420
alshabib902d41b2014-10-07 16:52:05 -0700421 }
422
423 @Override
424 public CompletedBatchOperation get(long timeout, TimeUnit unit)
425 throws InterruptedException, ExecutionException,
426 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700427
428 if (isDone()) {
429 return overall;
430 }
431 boolean success = true;
432 List<FlowEntry> failed = Lists.newLinkedList();
433 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700434 long start = System.nanoTime();
435 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700436
437 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700438 long now = System.nanoTime();
439 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700440 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700441 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700442 }
alshabib193525b2014-10-08 18:58:03 -0700443 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700444 }
445
alshabib193525b2014-10-08 18:58:03 -0700446 private boolean validateBatchOperation(List<FlowEntry> failed,
alshabib3effd042014-10-17 12:00:31 -0700447 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700448
449 if (isCancelled()) {
450 throw new CancellationException();
451 }
452 if (!completed.isSuccess()) {
453 failed.addAll(completed.failedItems());
454 cleanUpBatch();
455 cancelAllSubBatches();
456 return false;
457 }
458 return true;
459 }
460
461 private void cancelAllSubBatches() {
462 for (Future<CompletedBatchOperation> f : futures) {
463 f.cancel(true);
464 }
465 }
466
467 private CompletedBatchOperation finalizeBatchOperation(boolean success,
468 List<FlowEntry> failed) {
alshabib26834582014-10-08 20:15:46 -0700469 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700470 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
471 if (state.get() == BatchState.FINISHED) {
472 return overall;
473 }
474 throw new CancellationException();
475 }
476 overall = new CompletedBatchOperation(success, failed);
477 return overall;
478 }
479 }
480
481 private void cleanUpBatch() {
482 for (FlowRuleBatchEntry fbe : batches.values()) {
483 if (fbe.getOperator() == FlowRuleOperation.ADD ||
484 fbe.getOperator() == FlowRuleOperation.MODIFY) {
485 store.deleteFlowRule(fbe.getTarget());
486 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700487 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700488 store.storeFlowRule(fbe.getTarget());
489 }
490 }
491
492 }
alshabib902d41b2014-10-07 16:52:05 -0700493 }
494
495
alshabib193525b2014-10-08 18:58:03 -0700496
497
alshabib57044ba2014-09-16 15:58:01 -0700498}