blob: aef583101312dcbea6d690096efab2fe33b52553 [file] [log] [blame]
tombe988312014-09-19 18:38:47 -07001package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -07002
alshabibbb42cad2014-09-25 11:43:05 -07003import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -07005import static org.onlab.util.Tools.namedThreads;
alshabibbb42cad2014-09-25 11:43:05 -07006
alshabibbb42cad2014-09-25 11:43:05 -07007import java.util.List;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -07008import java.util.Map;
Madan Jampani117aaae2014-10-23 10:04:05 -07009import java.util.Set;
alshabib193525b2014-10-08 18:58:03 -070010import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -070011import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070012import java.util.concurrent.ExecutorService;
Madan Jampani117aaae2014-10-23 10:04:05 -070013import java.util.concurrent.Executors;
alshabib902d41b2014-10-07 16:52:05 -070014import java.util.concurrent.Future;
15import java.util.concurrent.TimeUnit;
16import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070017import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070018
alshabib57044ba2014-09-16 15:58:01 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
Thomas Vachuskae0f804a2014-10-27 23:40:48 -070025import org.onlab.onos.core.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070026import org.onlab.onos.event.AbstractListenerRegistry;
27import org.onlab.onos.event.EventDeliveryService;
28import org.onlab.onos.net.Device;
29import org.onlab.onos.net.DeviceId;
30import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070031import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070032import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070033import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070034import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070035import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070036import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070037import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070038import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070040import org.onlab.onos.net.flow.FlowRuleEvent;
41import org.onlab.onos.net.flow.FlowRuleListener;
42import org.onlab.onos.net.flow.FlowRuleProvider;
43import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
44import org.onlab.onos.net.flow.FlowRuleProviderService;
45import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070046import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070047import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070048import org.onlab.onos.net.provider.AbstractProviderRegistry;
49import org.onlab.onos.net.provider.AbstractProviderService;
50import org.slf4j.Logger;
51
alshabib902d41b2014-10-07 16:52:05 -070052import com.google.common.collect.ArrayListMultimap;
Madan Jampani6a456162014-10-24 11:36:17 -070053import com.google.common.collect.Iterables;
alshabibbb42cad2014-09-25 11:43:05 -070054import com.google.common.collect.Lists;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070055import com.google.common.collect.Maps;
alshabib902d41b2014-10-07 16:52:05 -070056import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070057import com.google.common.collect.Sets;
58import com.google.common.util.concurrent.Futures;
59import com.google.common.util.concurrent.ListenableFuture;
alshabiba7f7ca82014-09-22 11:41:23 -070060
tome4729872014-09-23 00:37:37 -070061/**
62 * Provides implementation of the flow NB & SB APIs.
63 */
alshabib57044ba2014-09-16 15:58:01 -070064@Component(immediate = true)
65@Service
tom202175a2014-09-19 19:00:11 -070066public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070067 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
68 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070069
alshabib193525b2014-10-08 18:58:03 -070070 enum BatchState { STARTED, FINISHED, CANCELLED };
71
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070072 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070073 private final Logger log = getLogger(getClass());
74
75 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070076 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070077
alshabibbb42cad2014-09-25 11:43:05 -070078 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070079
Thomas Vachuska8ac922d2014-10-23 16:17:03 -070080 private final ExecutorService futureListeners =
81 Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070082
tombe988312014-09-19 18:38:47 -070083 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070085
alshabib57044ba2014-09-16 15:58:01 -070086 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070087 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070088
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070090 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -070091
92 @Activate
93 public void activate() {
tomc78acee2014-09-24 15:16:55 -070094 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -070095 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
96 log.info("Started");
97 }
98
99 @Deactivate
100 public void deactivate() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700101 futureListeners.shutdownNow();
102
tomc78acee2014-09-24 15:16:55 -0700103 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700104 eventDispatcher.removeSink(FlowRuleEvent.class);
105 log.info("Stopped");
106 }
107
108 @Override
tom9b4030d2014-10-06 10:39:03 -0700109 public int getFlowRuleCount() {
110 return store.getFlowRuleCount();
111 }
112
113 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700114 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700115 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700116 }
117
118 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700119 public void applyFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700120 Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700121 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700122 toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700123 }
Madan Jampani6a456162014-10-24 11:36:17 -0700124 applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
alshabib57044ba2014-09-16 15:58:01 -0700125 }
126
127 @Override
128 public void removeFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700129 Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700130 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700131 toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700132 }
Madan Jampani6a456162014-10-24 11:36:17 -0700133 applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
alshabiba68eb962014-09-24 20:34:13 -0700134 }
alshabib57044ba2014-09-16 15:58:01 -0700135
alshabiba68eb962014-09-24 20:34:13 -0700136 @Override
137 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700138 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700139 }
140
141 @Override
142 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700143 Set<FlowRule> flowEntries = Sets.newHashSet();
144 for (Device d : deviceService.getDevices()) {
145 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
146 if (flowEntry.appId() == id.id()) {
147 flowEntries.add(flowEntry);
148 }
149 }
150 }
151 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700152 }
153
154 @Override
alshabib902d41b2014-10-07 16:52:05 -0700155 public Future<CompletedBatchOperation> applyBatch(
156 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700157 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700158 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700159 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700160 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
161 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700162 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700163 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700164
165 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700166 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700167 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
168 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700169 futures.add(future);
170 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700171 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700172 }
173
174 @Override
alshabib57044ba2014-09-16 15:58:01 -0700175 public void addListener(FlowRuleListener listener) {
176 listenerRegistry.addListener(listener);
177 }
178
179 @Override
180 public void removeListener(FlowRuleListener listener) {
181 listenerRegistry.removeListener(listener);
182 }
183
184 @Override
185 protected FlowRuleProviderService createProviderService(
186 FlowRuleProvider provider) {
187 return new InternalFlowRuleProviderService(provider);
188 }
189
190 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700191 extends AbstractProviderService<FlowRuleProvider>
192 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700193
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700194 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
195
alshabib57044ba2014-09-16 15:58:01 -0700196 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
197 super(provider);
198 }
199
200 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700201 public void flowRemoved(FlowEntry flowEntry) {
202 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700203 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700204 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700205 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700206 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700207 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700208 return;
209 }
alshabib1c319ff2014-10-04 20:29:09 -0700210 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700211 FlowRuleProvider frp = getProvider(device.providerId());
212 FlowRuleEvent event = null;
213 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700214 case ADDED:
215 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700216 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700217 break;
218 case PENDING_REMOVE:
219 case REMOVED:
220 event = store.removeFlowRule(stored);
221 break;
222 default:
223 break;
alshabib57044ba2014-09-16 15:58:01 -0700224
alshabiba68eb962014-09-24 20:34:13 -0700225 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700226 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700227 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700228 post(event);
229 }
alshabib57044ba2014-09-16 15:58:01 -0700230 }
231
alshabibba5ac482014-10-02 17:15:20 -0700232
alshabib1c319ff2014-10-04 20:29:09 -0700233 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700234 checkNotNull(flowRule, FLOW_RULE_NULL);
235 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700236 Device device = deviceService.getDevice(flowRule.deviceId());
237 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700238 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700239 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700240 case PENDING_REMOVE:
241 case REMOVED:
242 event = store.removeFlowRule(flowRule);
243 frp.removeFlowRule(flowRule);
244 break;
245 case ADDED:
246 case PENDING_ADD:
247 frp.applyFlowRule(flowRule);
248 break;
249 default:
250 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700251 }
252
alshabibbb42cad2014-09-25 11:43:05 -0700253 if (event != null) {
254 log.debug("Flow {} removed", flowRule);
255 post(event);
256 }
alshabib57044ba2014-09-16 15:58:01 -0700257
258 }
259
alshabibba5ac482014-10-02 17:15:20 -0700260
261 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700262 checkNotNull(flowRule, FLOW_RULE_NULL);
263 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700264 FlowRuleProvider frp = getProvider(flowRule.deviceId());
265 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700266 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700267 }
268
alshabibba5ac482014-10-02 17:15:20 -0700269
alshabib1c319ff2014-10-04 20:29:09 -0700270 private void flowAdded(FlowEntry flowEntry) {
271 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700272 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700273
alshabib1c319ff2014-10-04 20:29:09 -0700274 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700275
alshabib1c319ff2014-10-04 20:29:09 -0700276 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700277 if (event == null) {
278 log.debug("No flow store event generated.");
279 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700280 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700281 post(event);
282 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700283 } else {
Madan Jampani117aaae2014-10-23 10:04:05 -0700284 log.info("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700285 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700286 }
alshabib219ebaa2014-09-22 15:41:24 -0700287
alshabib57044ba2014-09-16 15:58:01 -0700288 }
289
alshabib1c319ff2014-10-04 20:29:09 -0700290 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
291 if (storedRule == null) {
292 return false;
293 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700294 if (storedRule.isPermanent()) {
295 return true;
296 }
297
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700298 final long timeout = storedRule.timeout() * 1000;
299 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700300 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700301 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700302 return true;
303 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700304 if (!lastSeen.containsKey(storedRule)) {
305 // checking for the first time
306 lastSeen.put(storedRule, storedRule.lastSeen());
307 // Use following if lastSeen attr. was removed.
308 //lastSeen.put(storedRule, currentTime);
309 }
310 Long last = lastSeen.get(storedRule);
311 if (last == null) {
312 // concurrently removed? let the liveness check fail
313 return false;
314 }
alshabib85c41972014-10-03 13:48:39 -0700315
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700316 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700317 return true;
318 }
319 return false;
alshabibba5ac482014-10-02 17:15:20 -0700320 }
321
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700322 // Posts the specified event to the local event dispatcher.
323 private void post(FlowRuleEvent event) {
324 if (event != null) {
325 eventDispatcher.post(event);
326 }
327 }
alshabib5c370ff2014-09-18 10:12:14 -0700328
329 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700330 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
331 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700332
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700333 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700334 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700335 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700336 flowAdded(rule);
337 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700338 // the device has a rule the store does not have
339 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700340 }
341 }
alshabib1c319ff2014-10-04 20:29:09 -0700342 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700343 // there are rules in the store that aren't on the switch
344 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700345
alshabiba7f7ca82014-09-22 11:41:23 -0700346 }
alshabib5c370ff2014-09-18 10:12:14 -0700347 }
alshabib57044ba2014-09-16 15:58:01 -0700348 }
349
tomc78acee2014-09-24 15:16:55 -0700350 // Store delegate to re-post events emitted from the store.
351 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Madan Jampani117aaae2014-10-23 10:04:05 -0700352 // TODO: Right now we only dispatch events at individual flowEntry level.
353 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700354 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700355 public void notify(FlowRuleBatchEvent event) {
356 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700357 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700358 case BATCH_OPERATION_REQUESTED:
Madan Jampani31961c12014-10-23 12:06:58 -0700359 for (FlowEntry entry : request.toAdd()) {
360 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
361 }
362 for (FlowEntry entry : request.toRemove()) {
363 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
364 }
365 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700366
Madan Jampani117aaae2014-10-23 10:04:05 -0700367 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700368
Madan Jampani117aaae2014-10-23 10:04:05 -0700369 FlowRuleProvider flowRuleProvider =
370 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
371 final ListenableFuture<CompletedBatchOperation> result =
372 flowRuleProvider.executeBatch(batchOperation);
373 result.addListener(new Runnable() {
374 @Override
375 public void run() {
Thomas Vachuska8ac922d2014-10-23 16:17:03 -0700376 store.batchOperationComplete(FlowRuleBatchEvent.completed(request,
377 Futures.getUnchecked(result)));
Madan Jampani117aaae2014-10-23 10:04:05 -0700378 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700379 }, futureListeners);
Madan Jampani117aaae2014-10-23 10:04:05 -0700380
381 break;
382 case BATCH_OPERATION_COMPLETED:
383 Set<FlowEntry> failedItems = event.result().failedItems();
384 for (FlowEntry entry : request.toAdd()) {
385 if (!failedItems.contains(entry)) {
386 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
387 }
388 }
389 for (FlowEntry entry : request.toRemove()) {
390 if (!failedItems.contains(entry)) {
391 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
392 }
393 }
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700394 break;
395 default:
396 break;
397 }
tomc78acee2014-09-24 15:16:55 -0700398 }
399 }
alshabib902d41b2014-10-07 16:52:05 -0700400
Madan Jampani117aaae2014-10-23 10:04:05 -0700401 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700402
alshabib193525b2014-10-08 18:58:03 -0700403 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700404 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700405 private final AtomicReference<BatchState> state;
406 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700407
alshabib193525b2014-10-08 18:58:03 -0700408 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700409 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700410 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700411 this.batches = batches;
412 state = new AtomicReference<FlowRuleManager.BatchState>();
413 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700414 }
415
416 @Override
417 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700418 if (state.get() == BatchState.FINISHED) {
419 return false;
420 }
421 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
422 return false;
423 }
424 cleanUpBatch();
425 for (Future<CompletedBatchOperation> f : futures) {
426 f.cancel(mayInterruptIfRunning);
427 }
428 return true;
alshabib902d41b2014-10-07 16:52:05 -0700429 }
430
431 @Override
432 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700433 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700434 }
435
436 @Override
437 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700438 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700439 }
440
alshabib193525b2014-10-08 18:58:03 -0700441
alshabib902d41b2014-10-07 16:52:05 -0700442 @Override
443 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700444 ExecutionException {
445
446 if (isDone()) {
447 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700448 }
alshabib193525b2014-10-08 18:58:03 -0700449
450 boolean success = true;
Madan Jampani117aaae2014-10-23 10:04:05 -0700451 Set<FlowEntry> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700452 CompletedBatchOperation completed;
453 for (Future<CompletedBatchOperation> future : futures) {
454 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700455 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700456 }
457
458 return finalizeBatchOperation(success, failed);
459
alshabib902d41b2014-10-07 16:52:05 -0700460 }
461
462 @Override
463 public CompletedBatchOperation get(long timeout, TimeUnit unit)
464 throws InterruptedException, ExecutionException,
465 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700466
467 if (isDone()) {
468 return overall;
469 }
470 boolean success = true;
Madan Jampani117aaae2014-10-23 10:04:05 -0700471 Set<FlowEntry> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700472 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700473 long start = System.nanoTime();
474 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700475
476 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700477 long now = System.nanoTime();
478 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700479 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700480 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700481 }
alshabib193525b2014-10-08 18:58:03 -0700482 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700483 }
484
Madan Jampani117aaae2014-10-23 10:04:05 -0700485 private boolean validateBatchOperation(Set<FlowEntry> failed,
alshabib3effd042014-10-17 12:00:31 -0700486 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700487
488 if (isCancelled()) {
489 throw new CancellationException();
490 }
491 if (!completed.isSuccess()) {
492 failed.addAll(completed.failedItems());
493 cleanUpBatch();
494 cancelAllSubBatches();
495 return false;
496 }
497 return true;
498 }
499
500 private void cancelAllSubBatches() {
501 for (Future<CompletedBatchOperation> f : futures) {
502 f.cancel(true);
503 }
504 }
505
506 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Madan Jampani117aaae2014-10-23 10:04:05 -0700507 Set<FlowEntry> failed) {
alshabib26834582014-10-08 20:15:46 -0700508 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700509 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
510 if (state.get() == BatchState.FINISHED) {
511 return overall;
512 }
513 throw new CancellationException();
514 }
515 overall = new CompletedBatchOperation(success, failed);
516 return overall;
517 }
518 }
519
520 private void cleanUpBatch() {
521 for (FlowRuleBatchEntry fbe : batches.values()) {
522 if (fbe.getOperator() == FlowRuleOperation.ADD ||
523 fbe.getOperator() == FlowRuleOperation.MODIFY) {
524 store.deleteFlowRule(fbe.getTarget());
525 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700526 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700527 store.storeFlowRule(fbe.getTarget());
528 }
529 }
alshabib193525b2014-10-08 18:58:03 -0700530 }
alshabib902d41b2014-10-07 16:52:05 -0700531 }
alshabib57044ba2014-09-16 15:58:01 -0700532}