blob: ba37d227193d1f32de2cfd89140a4744079741eb [file] [log] [blame]
tombe988312014-09-19 18:38:47 -07001package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -07002
alshabibbb42cad2014-09-25 11:43:05 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
alshabibbb42cad2014-09-25 11:43:05 -07006import java.util.List;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -07007import java.util.Map;
Madan Jampani117aaae2014-10-23 10:04:05 -07008import java.util.Set;
alshabib193525b2014-10-08 18:58:03 -07009import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -070010import java.util.concurrent.ExecutionException;
Madan Jampani117aaae2014-10-23 10:04:05 -070011import java.util.concurrent.Executors;
alshabib902d41b2014-10-07 16:52:05 -070012import java.util.concurrent.Future;
13import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070015import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070016
alshabib57044ba2014-09-16 15:58:01 -070017import org.apache.felix.scr.annotations.Activate;
18import org.apache.felix.scr.annotations.Component;
19import org.apache.felix.scr.annotations.Deactivate;
20import org.apache.felix.scr.annotations.Reference;
21import org.apache.felix.scr.annotations.ReferenceCardinality;
22import org.apache.felix.scr.annotations.Service;
alshabiba68eb962014-09-24 20:34:13 -070023import org.onlab.onos.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070024import org.onlab.onos.event.AbstractListenerRegistry;
25import org.onlab.onos.event.EventDeliveryService;
26import org.onlab.onos.net.Device;
27import org.onlab.onos.net.DeviceId;
28import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070029import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070030import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070031import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070032import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070033import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070034import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070036import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070037import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070038import org.onlab.onos.net.flow.FlowRuleEvent;
39import org.onlab.onos.net.flow.FlowRuleListener;
40import org.onlab.onos.net.flow.FlowRuleProvider;
41import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
42import org.onlab.onos.net.flow.FlowRuleProviderService;
43import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070044import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070045import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070046import org.onlab.onos.net.provider.AbstractProviderRegistry;
47import org.onlab.onos.net.provider.AbstractProviderService;
48import org.slf4j.Logger;
49
alshabib902d41b2014-10-07 16:52:05 -070050import com.google.common.collect.ArrayListMultimap;
alshabibbb42cad2014-09-25 11:43:05 -070051import com.google.common.collect.Lists;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070052import com.google.common.collect.Maps;
alshabib902d41b2014-10-07 16:52:05 -070053import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070054import com.google.common.collect.Sets;
55import com.google.common.util.concurrent.Futures;
56import com.google.common.util.concurrent.ListenableFuture;
alshabiba7f7ca82014-09-22 11:41:23 -070057
tome4729872014-09-23 00:37:37 -070058/**
59 * Provides implementation of the flow NB & SB APIs.
60 */
alshabib57044ba2014-09-16 15:58:01 -070061@Component(immediate = true)
62@Service
tom202175a2014-09-19 19:00:11 -070063public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070064 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
65 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070066
alshabib193525b2014-10-08 18:58:03 -070067 enum BatchState { STARTED, FINISHED, CANCELLED };
68
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070069 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070070 private final Logger log = getLogger(getClass());
71
72 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070073 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070074
alshabibbb42cad2014-09-25 11:43:05 -070075 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070076
tombe988312014-09-19 18:38:47 -070077 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070079
alshabib57044ba2014-09-16 15:58:01 -070080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070081 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070082
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070084 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -070085
86 @Activate
87 public void activate() {
tomc78acee2014-09-24 15:16:55 -070088 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070089 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
90 log.info("Started");
91 }
92
93 @Deactivate
94 public void deactivate() {
tomc78acee2014-09-24 15:16:55 -070095 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070096 eventDispatcher.removeSink(FlowRuleEvent.class);
97 log.info("Stopped");
98 }
99
100 @Override
tom9b4030d2014-10-06 10:39:03 -0700101 public int getFlowRuleCount() {
102 return store.getFlowRuleCount();
103 }
104
105 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700106 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700107 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700108 }
109
110 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700111 public void applyFlowRules(FlowRule... flowRules) {
alshabib57044ba2014-09-16 15:58:01 -0700112 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700113 FlowRule f = flowRules[i];
Madan Jampani117aaae2014-10-23 10:04:05 -0700114 store.storeFlowRule(f);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700115 }
116 }
117
118 private void applyFlowRulesToProviders(FlowRule... flowRules) {
119 DeviceId did = null;
120 FlowRuleProvider frp = null;
121 for (FlowRule f : flowRules) {
122 if (!f.deviceId().equals(did)) {
123 did = f.deviceId();
124 final Device device = deviceService.getDevice(did);
125 frp = getProvider(device.providerId());
126 }
127 if (frp != null) {
128 frp.applyFlowRule(f);
129 }
alshabib57044ba2014-09-16 15:58:01 -0700130 }
alshabib57044ba2014-09-16 15:58:01 -0700131 }
132
133 @Override
134 public void removeFlowRules(FlowRule... flowRules) {
alshabibbb8b1282014-09-22 17:00:18 -0700135 FlowRule f;
alshabib57044ba2014-09-16 15:58:01 -0700136 for (int i = 0; i < flowRules.length; i++) {
alshabiba68eb962014-09-24 20:34:13 -0700137 f = flowRules[i];
Madan Jampani117aaae2014-10-23 10:04:05 -0700138 store.deleteFlowRule(f);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700139 }
140 }
141
142 private void removeFlowRulesFromProviders(FlowRule... flowRules) {
143 DeviceId did = null;
144 FlowRuleProvider frp = null;
145 for (FlowRule f : flowRules) {
146 if (!f.deviceId().equals(did)) {
147 did = f.deviceId();
148 final Device device = deviceService.getDevice(did);
tom7951b232014-10-06 13:35:30 -0700149 frp = getProvider(device.providerId());
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700150 }
151 if (frp != null) {
tom7951b232014-10-06 13:35:30 -0700152 frp.removeFlowRule(f);
153 }
alshabib57044ba2014-09-16 15:58:01 -0700154 }
alshabiba68eb962014-09-24 20:34:13 -0700155 }
alshabib57044ba2014-09-16 15:58:01 -0700156
alshabiba68eb962014-09-24 20:34:13 -0700157 @Override
158 public void removeFlowRulesById(ApplicationId id) {
tom9b4030d2014-10-06 10:39:03 -0700159 Iterable<FlowRule> rules = getFlowRulesById(id);
alshabiba68eb962014-09-24 20:34:13 -0700160 FlowRuleProvider frp;
161 Device device;
alshabibbb42cad2014-09-25 11:43:05 -0700162
alshabiba68eb962014-09-24 20:34:13 -0700163 for (FlowRule f : rules) {
164 store.deleteFlowRule(f);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700165 // FIXME: only accept request and push to provider on internal event
alshabiba68eb962014-09-24 20:34:13 -0700166 device = deviceService.getDevice(f.deviceId());
167 frp = getProvider(device.providerId());
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700168 // FIXME: flows removed from store and flows removed from might diverge
169 // get rid of #removeRulesById?
alshabiba68eb962014-09-24 20:34:13 -0700170 frp.removeRulesById(id, f);
171 }
172 }
173
174 @Override
175 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
alshabib1c319ff2014-10-04 20:29:09 -0700176 return store.getFlowRulesByAppId(id);
alshabib57044ba2014-09-16 15:58:01 -0700177 }
178
179 @Override
alshabib902d41b2014-10-07 16:52:05 -0700180 public Future<CompletedBatchOperation> applyBatch(
181 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700182 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700183 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700184 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700185 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
186 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700187 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700188 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700189
190 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700191 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700192 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
193 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700194 futures.add(future);
195 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700196 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700197 }
198
199 @Override
alshabib57044ba2014-09-16 15:58:01 -0700200 public void addListener(FlowRuleListener listener) {
201 listenerRegistry.addListener(listener);
202 }
203
204 @Override
205 public void removeListener(FlowRuleListener listener) {
206 listenerRegistry.removeListener(listener);
207 }
208
209 @Override
210 protected FlowRuleProviderService createProviderService(
211 FlowRuleProvider provider) {
212 return new InternalFlowRuleProviderService(provider);
213 }
214
215 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700216 extends AbstractProviderService<FlowRuleProvider>
217 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700218
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700219 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
220
alshabib57044ba2014-09-16 15:58:01 -0700221 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
222 super(provider);
223 }
224
225 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700226 public void flowRemoved(FlowEntry flowEntry) {
227 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700228 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700229 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700230 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700231 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700232 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700233 return;
234 }
alshabib1c319ff2014-10-04 20:29:09 -0700235 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700236 FlowRuleProvider frp = getProvider(device.providerId());
237 FlowRuleEvent event = null;
238 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700239 case ADDED:
240 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700241 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700242 break;
243 case PENDING_REMOVE:
244 case REMOVED:
245 event = store.removeFlowRule(stored);
246 break;
247 default:
248 break;
alshabib57044ba2014-09-16 15:58:01 -0700249
alshabiba68eb962014-09-24 20:34:13 -0700250 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700251 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700252 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700253 post(event);
254 }
alshabib57044ba2014-09-16 15:58:01 -0700255 }
256
alshabibba5ac482014-10-02 17:15:20 -0700257
alshabib1c319ff2014-10-04 20:29:09 -0700258 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700259 checkNotNull(flowRule, FLOW_RULE_NULL);
260 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700261 Device device = deviceService.getDevice(flowRule.deviceId());
262 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700263 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700264 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700265 case PENDING_REMOVE:
266 case REMOVED:
267 event = store.removeFlowRule(flowRule);
268 frp.removeFlowRule(flowRule);
269 break;
270 case ADDED:
271 case PENDING_ADD:
272 frp.applyFlowRule(flowRule);
273 break;
274 default:
275 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700276 }
277
alshabibbb42cad2014-09-25 11:43:05 -0700278 if (event != null) {
279 log.debug("Flow {} removed", flowRule);
280 post(event);
281 }
alshabib57044ba2014-09-16 15:58:01 -0700282
283 }
284
alshabibba5ac482014-10-02 17:15:20 -0700285
286 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700287 checkNotNull(flowRule, FLOW_RULE_NULL);
288 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700289 removeFlowRules(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700290 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700291 }
292
alshabibba5ac482014-10-02 17:15:20 -0700293
alshabib1c319ff2014-10-04 20:29:09 -0700294 private void flowAdded(FlowEntry flowEntry) {
295 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700296 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700297
alshabib1c319ff2014-10-04 20:29:09 -0700298 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700299
alshabib1c319ff2014-10-04 20:29:09 -0700300 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700301 if (event == null) {
302 log.debug("No flow store event generated.");
303 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700304 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700305 post(event);
306 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700307 } else {
Madan Jampani117aaae2014-10-23 10:04:05 -0700308 log.info("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700309 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700310 }
alshabib219ebaa2014-09-22 15:41:24 -0700311
alshabib57044ba2014-09-16 15:58:01 -0700312 }
313
alshabib1c319ff2014-10-04 20:29:09 -0700314 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
315 if (storedRule == null) {
316 return false;
317 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700318 if (storedRule.isPermanent()) {
319 return true;
320 }
321
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700322 final long timeout = storedRule.timeout() * 1000;
323 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700324 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700325 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700326 return true;
327 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700328 if (!lastSeen.containsKey(storedRule)) {
329 // checking for the first time
330 lastSeen.put(storedRule, storedRule.lastSeen());
331 // Use following if lastSeen attr. was removed.
332 //lastSeen.put(storedRule, currentTime);
333 }
334 Long last = lastSeen.get(storedRule);
335 if (last == null) {
336 // concurrently removed? let the liveness check fail
337 return false;
338 }
alshabib85c41972014-10-03 13:48:39 -0700339
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700340 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700341 return true;
342 }
343 return false;
alshabibba5ac482014-10-02 17:15:20 -0700344 }
345
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700346 // Posts the specified event to the local event dispatcher.
347 private void post(FlowRuleEvent event) {
348 if (event != null) {
349 eventDispatcher.post(event);
350 }
351 }
alshabib5c370ff2014-09-18 10:12:14 -0700352
353 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700354 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
355 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700356
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700357 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700358 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700359 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700360 flowAdded(rule);
361 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700362 // the device has a rule the store does not have
363 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700364 }
365 }
alshabib1c319ff2014-10-04 20:29:09 -0700366 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700367 // there are rules in the store that aren't on the switch
368 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700369
alshabiba7f7ca82014-09-22 11:41:23 -0700370 }
alshabib5c370ff2014-09-18 10:12:14 -0700371 }
alshabib57044ba2014-09-16 15:58:01 -0700372 }
373
tomc78acee2014-09-24 15:16:55 -0700374 // Store delegate to re-post events emitted from the store.
375 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Madan Jampani117aaae2014-10-23 10:04:05 -0700376 // TODO: Right now we only dispatch events at individual flowEntry level.
377 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700378 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700379 public void notify(FlowRuleBatchEvent event) {
380 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700381 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700382 case BATCH_OPERATION_REQUESTED:
383// for (FlowEntry entry : request.toAdd()) {
384// //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
385// }
386// for (FlowEntry entry : request.toRemove()) {
387// //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
388// }
389// // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
390//
391 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700392
Madan Jampani117aaae2014-10-23 10:04:05 -0700393 FlowRuleProvider flowRuleProvider =
394 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
395 final ListenableFuture<CompletedBatchOperation> result =
396 flowRuleProvider.executeBatch(batchOperation);
397 result.addListener(new Runnable() {
398 @Override
399 public void run() {
400 store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
401 }
402 }, Executors.newCachedThreadPool());
403
404 break;
405 case BATCH_OPERATION_COMPLETED:
406 Set<FlowEntry> failedItems = event.result().failedItems();
407 for (FlowEntry entry : request.toAdd()) {
408 if (!failedItems.contains(entry)) {
409 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
410 }
411 }
412 for (FlowEntry entry : request.toRemove()) {
413 if (!failedItems.contains(entry)) {
414 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
415 }
416 }
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700417 break;
418 default:
419 break;
420 }
tomc78acee2014-09-24 15:16:55 -0700421 }
422 }
alshabib902d41b2014-10-07 16:52:05 -0700423
Madan Jampani117aaae2014-10-23 10:04:05 -0700424 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700425
alshabib193525b2014-10-08 18:58:03 -0700426 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700427 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700428 private final AtomicReference<BatchState> state;
429 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700430
alshabib193525b2014-10-08 18:58:03 -0700431 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700432 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700433 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700434 this.batches = batches;
435 state = new AtomicReference<FlowRuleManager.BatchState>();
436 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700437 }
438
439 @Override
440 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700441 if (state.get() == BatchState.FINISHED) {
442 return false;
443 }
444 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
445 return false;
446 }
447 cleanUpBatch();
448 for (Future<CompletedBatchOperation> f : futures) {
449 f.cancel(mayInterruptIfRunning);
450 }
451 return true;
alshabib902d41b2014-10-07 16:52:05 -0700452 }
453
454 @Override
455 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700456 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700457 }
458
459 @Override
460 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700461 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700462 }
463
alshabib193525b2014-10-08 18:58:03 -0700464
alshabib902d41b2014-10-07 16:52:05 -0700465 @Override
466 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700467 ExecutionException {
468
469 if (isDone()) {
470 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700471 }
alshabib193525b2014-10-08 18:58:03 -0700472
473 boolean success = true;
Madan Jampani117aaae2014-10-23 10:04:05 -0700474 Set<FlowEntry> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700475 CompletedBatchOperation completed;
476 for (Future<CompletedBatchOperation> future : futures) {
477 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700478 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700479 }
480
481 return finalizeBatchOperation(success, failed);
482
alshabib902d41b2014-10-07 16:52:05 -0700483 }
484
485 @Override
486 public CompletedBatchOperation get(long timeout, TimeUnit unit)
487 throws InterruptedException, ExecutionException,
488 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700489
490 if (isDone()) {
491 return overall;
492 }
493 boolean success = true;
Madan Jampani117aaae2014-10-23 10:04:05 -0700494 Set<FlowEntry> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700495 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700496 long start = System.nanoTime();
497 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700498
499 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700500 long now = System.nanoTime();
501 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700502 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700503 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700504 }
alshabib193525b2014-10-08 18:58:03 -0700505 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700506 }
507
Madan Jampani117aaae2014-10-23 10:04:05 -0700508 private boolean validateBatchOperation(Set<FlowEntry> failed,
alshabib3effd042014-10-17 12:00:31 -0700509 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700510
511 if (isCancelled()) {
512 throw new CancellationException();
513 }
514 if (!completed.isSuccess()) {
515 failed.addAll(completed.failedItems());
516 cleanUpBatch();
517 cancelAllSubBatches();
518 return false;
519 }
520 return true;
521 }
522
523 private void cancelAllSubBatches() {
524 for (Future<CompletedBatchOperation> f : futures) {
525 f.cancel(true);
526 }
527 }
528
529 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Madan Jampani117aaae2014-10-23 10:04:05 -0700530 Set<FlowEntry> failed) {
alshabib26834582014-10-08 20:15:46 -0700531 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700532 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
533 if (state.get() == BatchState.FINISHED) {
534 return overall;
535 }
536 throw new CancellationException();
537 }
538 overall = new CompletedBatchOperation(success, failed);
539 return overall;
540 }
541 }
542
543 private void cleanUpBatch() {
544 for (FlowRuleBatchEntry fbe : batches.values()) {
545 if (fbe.getOperator() == FlowRuleOperation.ADD ||
546 fbe.getOperator() == FlowRuleOperation.MODIFY) {
547 store.deleteFlowRule(fbe.getTarget());
548 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700549 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700550 store.storeFlowRule(fbe.getTarget());
551 }
552 }
alshabib193525b2014-10-08 18:58:03 -0700553 }
alshabib902d41b2014-10-07 16:52:05 -0700554 }
alshabib57044ba2014-09-16 15:58:01 -0700555}