blob: d8f89ae7b03cf6ba4f45584d3d0550c6e44ed064 [file] [log] [blame]
tombe988312014-09-19 18:38:47 -07001package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -07002
alshabibbb42cad2014-09-25 11:43:05 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
alshabibbb42cad2014-09-25 11:43:05 -07006import java.util.List;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -07007import java.util.Map;
alshabib193525b2014-10-08 18:58:03 -07008import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -07009import java.util.concurrent.ExecutionException;
10import java.util.concurrent.Future;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070013import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070014
alshabib57044ba2014-09-16 15:58:01 -070015import org.apache.felix.scr.annotations.Activate;
16import org.apache.felix.scr.annotations.Component;
17import org.apache.felix.scr.annotations.Deactivate;
18import org.apache.felix.scr.annotations.Reference;
19import org.apache.felix.scr.annotations.ReferenceCardinality;
20import org.apache.felix.scr.annotations.Service;
alshabiba68eb962014-09-24 20:34:13 -070021import org.onlab.onos.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070022import org.onlab.onos.event.AbstractListenerRegistry;
23import org.onlab.onos.event.EventDeliveryService;
24import org.onlab.onos.net.Device;
25import org.onlab.onos.net.DeviceId;
26import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070027import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070028import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070029import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070030import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070031import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070032import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib902d41b2014-10-07 16:52:05 -070033import org.onlab.onos.net.flow.FlowRuleBatchOperation;
alshabib57044ba2014-09-16 15:58:01 -070034import org.onlab.onos.net.flow.FlowRuleEvent;
35import org.onlab.onos.net.flow.FlowRuleListener;
36import org.onlab.onos.net.flow.FlowRuleProvider;
37import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
38import org.onlab.onos.net.flow.FlowRuleProviderService;
39import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070040import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070041import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070042import org.onlab.onos.net.provider.AbstractProviderRegistry;
43import org.onlab.onos.net.provider.AbstractProviderService;
44import org.slf4j.Logger;
45
alshabib902d41b2014-10-07 16:52:05 -070046import com.google.common.collect.ArrayListMultimap;
alshabibbb42cad2014-09-25 11:43:05 -070047import com.google.common.collect.Lists;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070048import com.google.common.collect.Maps;
alshabib902d41b2014-10-07 16:52:05 -070049import com.google.common.collect.Multimap;
alshabiba7f7ca82014-09-22 11:41:23 -070050
tome4729872014-09-23 00:37:37 -070051/**
52 * Provides implementation of the flow NB & SB APIs.
53 */
alshabib57044ba2014-09-16 15:58:01 -070054@Component(immediate = true)
55@Service
tom202175a2014-09-19 19:00:11 -070056public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070057 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
58 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070059
alshabib193525b2014-10-08 18:58:03 -070060 enum BatchState { STARTED, FINISHED, CANCELLED };
61
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070062 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070063 private final Logger log = getLogger(getClass());
64
65 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070066 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070067
alshabibbb42cad2014-09-25 11:43:05 -070068 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070069
tombe988312014-09-19 18:38:47 -070070 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070072
alshabib57044ba2014-09-16 15:58:01 -070073 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070074 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070075
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070077 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -070078
79 @Activate
80 public void activate() {
tomc78acee2014-09-24 15:16:55 -070081 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070082 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
83 log.info("Started");
84 }
85
86 @Deactivate
87 public void deactivate() {
tomc78acee2014-09-24 15:16:55 -070088 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070089 eventDispatcher.removeSink(FlowRuleEvent.class);
90 log.info("Stopped");
91 }
92
93 @Override
tom9b4030d2014-10-06 10:39:03 -070094 public int getFlowRuleCount() {
95 return store.getFlowRuleCount();
96 }
97
98 @Override
alshabib1c319ff2014-10-04 20:29:09 -070099 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700100 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700101 }
102
103 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700104 public void applyFlowRules(FlowRule... flowRules) {
alshabib57044ba2014-09-16 15:58:01 -0700105 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700106 FlowRule f = flowRules[i];
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700107 boolean local = store.storeFlowRule(f);
108 if (local) {
109 // TODO: aggregate all local rules and push down once?
110 applyFlowRulesToProviders(f);
alshabib3d643ec2014-10-22 18:33:00 -0700111 eventDispatcher.post(
112 new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f));
113
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700114 }
115 }
116 }
117
118 private void applyFlowRulesToProviders(FlowRule... flowRules) {
119 DeviceId did = null;
120 FlowRuleProvider frp = null;
121 for (FlowRule f : flowRules) {
122 if (!f.deviceId().equals(did)) {
123 did = f.deviceId();
124 final Device device = deviceService.getDevice(did);
125 frp = getProvider(device.providerId());
126 }
127 if (frp != null) {
128 frp.applyFlowRule(f);
129 }
alshabib57044ba2014-09-16 15:58:01 -0700130 }
alshabib57044ba2014-09-16 15:58:01 -0700131 }
132
133 @Override
134 public void removeFlowRules(FlowRule... flowRules) {
alshabibbb8b1282014-09-22 17:00:18 -0700135 FlowRule f;
alshabib57044ba2014-09-16 15:58:01 -0700136 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700137 f = flowRules[i];
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700138 boolean local = store.deleteFlowRule(f);
139 if (local) {
140 // TODO: aggregate all local rules and push down once?
141 removeFlowRulesFromProviders(f);
alshabib3d643ec2014-10-22 18:33:00 -0700142 eventDispatcher.post(
143 new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700144 }
145 }
146 }
147
148 private void removeFlowRulesFromProviders(FlowRule... flowRules) {
149 DeviceId did = null;
150 FlowRuleProvider frp = null;
151 for (FlowRule f : flowRules) {
152 if (!f.deviceId().equals(did)) {
153 did = f.deviceId();
154 final Device device = deviceService.getDevice(did);
tom7951b232014-10-06 13:35:30 -0700155 frp = getProvider(device.providerId());
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700156 }
157 if (frp != null) {
tom7951b232014-10-06 13:35:30 -0700158 frp.removeFlowRule(f);
159 }
alshabib57044ba2014-09-16 15:58:01 -0700160 }
alshabiba68eb962014-09-24 20:34:13 -0700161 }
alshabib57044ba2014-09-16 15:58:01 -0700162
alshabiba68eb962014-09-24 20:34:13 -0700163 @Override
164 public void removeFlowRulesById(ApplicationId id) {
tom9b4030d2014-10-06 10:39:03 -0700165 Iterable<FlowRule> rules = getFlowRulesById(id);
alshabiba68eb962014-09-24 20:34:13 -0700166 FlowRuleProvider frp;
167 Device device;
alshabibbb42cad2014-09-25 11:43:05 -0700168
alshabiba68eb962014-09-24 20:34:13 -0700169 for (FlowRule f : rules) {
170 store.deleteFlowRule(f);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700171 // FIXME: only accept request and push to provider on internal event
alshabiba68eb962014-09-24 20:34:13 -0700172 device = deviceService.getDevice(f.deviceId());
173 frp = getProvider(device.providerId());
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700174 // FIXME: flows removed from store and flows removed from might diverge
175 // get rid of #removeRulesById?
alshabiba68eb962014-09-24 20:34:13 -0700176 frp.removeRulesById(id, f);
177 }
178 }
179
180 @Override
181 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
alshabib1c319ff2014-10-04 20:29:09 -0700182 return store.getFlowRulesByAppId(id);
alshabib57044ba2014-09-16 15:58:01 -0700183 }
184
185 @Override
alshabib902d41b2014-10-07 16:52:05 -0700186 public Future<CompletedBatchOperation> applyBatch(
187 FlowRuleBatchOperation batch) {
188 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
189 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700190 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700191 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
192 final FlowRule f = fbe.getTarget();
193 final Device device = deviceService.getDevice(f.deviceId());
194 final FlowRuleProvider frp = getProvider(device.providerId());
195 batches.put(frp, fbe);
196 switch (fbe.getOperator()) {
197 case ADD:
198 store.storeFlowRule(f);
199 break;
200 case REMOVE:
201 store.deleteFlowRule(f);
202 break;
203 case MODIFY:
204 default:
205 log.error("Batch operation type {} unsupported.", fbe.getOperator());
206 }
207 }
208 for (FlowRuleProvider provider : batches.keySet()) {
209 FlowRuleBatchOperation b =
210 new FlowRuleBatchOperation(batches.get(provider));
alshabib193525b2014-10-08 18:58:03 -0700211 Future<CompletedBatchOperation> future = provider.executeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700212 futures.add(future);
213 }
alshabib193525b2014-10-08 18:58:03 -0700214 return new FlowRuleBatchFuture(futures, batches);
alshabib902d41b2014-10-07 16:52:05 -0700215 }
216
217 @Override
alshabib57044ba2014-09-16 15:58:01 -0700218 public void addListener(FlowRuleListener listener) {
219 listenerRegistry.addListener(listener);
220 }
221
222 @Override
223 public void removeListener(FlowRuleListener listener) {
224 listenerRegistry.removeListener(listener);
225 }
226
227 @Override
228 protected FlowRuleProviderService createProviderService(
229 FlowRuleProvider provider) {
230 return new InternalFlowRuleProviderService(provider);
231 }
232
233 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700234 extends AbstractProviderService<FlowRuleProvider>
235 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700236
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700237 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
238
alshabib57044ba2014-09-16 15:58:01 -0700239 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
240 super(provider);
241 }
242
243 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700244 public void flowRemoved(FlowEntry flowEntry) {
245 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700246 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700247 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700248 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700249 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700250 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700251 return;
252 }
alshabib1c319ff2014-10-04 20:29:09 -0700253 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700254 FlowRuleProvider frp = getProvider(device.providerId());
255 FlowRuleEvent event = null;
256 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700257 case ADDED:
258 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700259 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700260 break;
261 case PENDING_REMOVE:
262 case REMOVED:
263 event = store.removeFlowRule(stored);
264 break;
265 default:
266 break;
alshabib57044ba2014-09-16 15:58:01 -0700267
alshabiba68eb962014-09-24 20:34:13 -0700268 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700269 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700270 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700271 post(event);
272 }
alshabib57044ba2014-09-16 15:58:01 -0700273 }
274
alshabibba5ac482014-10-02 17:15:20 -0700275
alshabib1c319ff2014-10-04 20:29:09 -0700276 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700277 checkNotNull(flowRule, FLOW_RULE_NULL);
278 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700279 Device device = deviceService.getDevice(flowRule.deviceId());
280 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700281 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700282 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700283 case PENDING_REMOVE:
284 case REMOVED:
285 event = store.removeFlowRule(flowRule);
286 frp.removeFlowRule(flowRule);
287 break;
288 case ADDED:
289 case PENDING_ADD:
290 frp.applyFlowRule(flowRule);
291 break;
292 default:
293 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700294 }
295
alshabibbb42cad2014-09-25 11:43:05 -0700296 if (event != null) {
297 log.debug("Flow {} removed", flowRule);
298 post(event);
299 }
alshabib57044ba2014-09-16 15:58:01 -0700300
301 }
302
alshabibba5ac482014-10-02 17:15:20 -0700303
304 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700305 checkNotNull(flowRule, FLOW_RULE_NULL);
306 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700307 FlowRuleProvider frp = getProvider(flowRule.deviceId());
308 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700309 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700310 }
311
alshabibba5ac482014-10-02 17:15:20 -0700312
alshabib1c319ff2014-10-04 20:29:09 -0700313 private void flowAdded(FlowEntry flowEntry) {
314 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700315 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700316
alshabib1c319ff2014-10-04 20:29:09 -0700317 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700318
alshabib1c319ff2014-10-04 20:29:09 -0700319 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700320 if (event == null) {
321 log.debug("No flow store event generated.");
322 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700323 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700324 post(event);
325 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700326 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700327 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700328 }
alshabib219ebaa2014-09-22 15:41:24 -0700329
alshabib57044ba2014-09-16 15:58:01 -0700330 }
331
alshabib1c319ff2014-10-04 20:29:09 -0700332 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
333 if (storedRule == null) {
334 return false;
335 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700336 if (storedRule.isPermanent()) {
337 return true;
338 }
339
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700340 final long timeout = storedRule.timeout() * 1000;
341 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700342 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700343 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700344 return true;
345 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700346 if (!lastSeen.containsKey(storedRule)) {
347 // checking for the first time
348 lastSeen.put(storedRule, storedRule.lastSeen());
349 // Use following if lastSeen attr. was removed.
350 //lastSeen.put(storedRule, currentTime);
351 }
352 Long last = lastSeen.get(storedRule);
353 if (last == null) {
354 // concurrently removed? let the liveness check fail
355 return false;
356 }
alshabib85c41972014-10-03 13:48:39 -0700357
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700358 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700359 return true;
360 }
361 return false;
alshabibba5ac482014-10-02 17:15:20 -0700362 }
363
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700364 // Posts the specified event to the local event dispatcher.
365 private void post(FlowRuleEvent event) {
366 if (event != null) {
367 eventDispatcher.post(event);
368 }
369 }
alshabib5c370ff2014-09-18 10:12:14 -0700370
371 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700372 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
373 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700374
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700375 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700376 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700377 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700378 flowAdded(rule);
379 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700380 // the device has a rule the store does not have
381 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700382 }
383 }
alshabib1c319ff2014-10-04 20:29:09 -0700384 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700385 // there are rules in the store that aren't on the switch
386 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700387
alshabiba7f7ca82014-09-22 11:41:23 -0700388 }
alshabib5c370ff2014-09-18 10:12:14 -0700389 }
alshabib57044ba2014-09-16 15:58:01 -0700390 }
391
tomc78acee2014-09-24 15:16:55 -0700392 // Store delegate to re-post events emitted from the store.
393 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
394 @Override
395 public void notify(FlowRuleEvent event) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700396 switch (event.type()) {
397 case RULE_ADD_REQUESTED:
398 applyFlowRulesToProviders(event.subject());
399 break;
400 case RULE_REMOVE_REQUESTED:
401 removeFlowRulesFromProviders(event.subject());
402 break;
403
404 case RULE_ADDED:
405 case RULE_REMOVED:
406 case RULE_UPDATED:
407 // only dispatch events related to switch
408 eventDispatcher.post(event);
409 break;
410 default:
411 break;
412 }
tomc78acee2014-09-24 15:16:55 -0700413 }
414 }
alshabib902d41b2014-10-07 16:52:05 -0700415
416 private class FlowRuleBatchFuture
417 implements Future<CompletedBatchOperation> {
418
alshabib193525b2014-10-08 18:58:03 -0700419 private final List<Future<CompletedBatchOperation>> futures;
420 private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
421 private final AtomicReference<BatchState> state;
422 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700423
alshabib193525b2014-10-08 18:58:03 -0700424
425
426 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
427 Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700428 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700429 this.batches = batches;
430 state = new AtomicReference<FlowRuleManager.BatchState>();
431 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700432 }
433
434 @Override
435 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700436 if (state.get() == BatchState.FINISHED) {
437 return false;
438 }
439 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
440 return false;
441 }
442 cleanUpBatch();
443 for (Future<CompletedBatchOperation> f : futures) {
444 f.cancel(mayInterruptIfRunning);
445 }
446 return true;
alshabib902d41b2014-10-07 16:52:05 -0700447 }
448
449 @Override
450 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700451 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700452 }
453
454 @Override
455 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700456 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700457 }
458
alshabib193525b2014-10-08 18:58:03 -0700459
alshabib902d41b2014-10-07 16:52:05 -0700460 @Override
461 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700462 ExecutionException {
463
464 if (isDone()) {
465 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700466 }
alshabib193525b2014-10-08 18:58:03 -0700467
468 boolean success = true;
469 List<FlowEntry> failed = Lists.newLinkedList();
470 CompletedBatchOperation completed;
471 for (Future<CompletedBatchOperation> future : futures) {
472 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700473 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700474 }
475
476 return finalizeBatchOperation(success, failed);
477
alshabib902d41b2014-10-07 16:52:05 -0700478 }
479
480 @Override
481 public CompletedBatchOperation get(long timeout, TimeUnit unit)
482 throws InterruptedException, ExecutionException,
483 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700484
485 if (isDone()) {
486 return overall;
487 }
488 boolean success = true;
489 List<FlowEntry> failed = Lists.newLinkedList();
490 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700491 long start = System.nanoTime();
492 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700493
494 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700495 long now = System.nanoTime();
496 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700497 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700498 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700499 }
alshabib193525b2014-10-08 18:58:03 -0700500 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700501 }
502
alshabib193525b2014-10-08 18:58:03 -0700503 private boolean validateBatchOperation(List<FlowEntry> failed,
alshabib3effd042014-10-17 12:00:31 -0700504 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700505
506 if (isCancelled()) {
507 throw new CancellationException();
508 }
509 if (!completed.isSuccess()) {
510 failed.addAll(completed.failedItems());
511 cleanUpBatch();
512 cancelAllSubBatches();
513 return false;
514 }
515 return true;
516 }
517
518 private void cancelAllSubBatches() {
519 for (Future<CompletedBatchOperation> f : futures) {
520 f.cancel(true);
521 }
522 }
523
524 private CompletedBatchOperation finalizeBatchOperation(boolean success,
525 List<FlowEntry> failed) {
alshabib26834582014-10-08 20:15:46 -0700526 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700527 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
528 if (state.get() == BatchState.FINISHED) {
529 return overall;
530 }
531 throw new CancellationException();
532 }
533 overall = new CompletedBatchOperation(success, failed);
534 return overall;
535 }
536 }
537
538 private void cleanUpBatch() {
539 for (FlowRuleBatchEntry fbe : batches.values()) {
540 if (fbe.getOperator() == FlowRuleOperation.ADD ||
541 fbe.getOperator() == FlowRuleOperation.MODIFY) {
542 store.deleteFlowRule(fbe.getTarget());
543 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700544 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700545 store.storeFlowRule(fbe.getTarget());
546 }
547 }
548
549 }
alshabib902d41b2014-10-07 16:52:05 -0700550 }
551
552
alshabib193525b2014-10-08 18:58:03 -0700553
554
alshabib57044ba2014-09-16 15:58:01 -0700555}