blob: 7e5f049973350a78b08c113fe1871c8c6ef01e17 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
tombe988312014-09-19 18:38:47 -070016package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
Thomas Vachuska9b2da212014-11-10 19:30:25 -080018import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
alshabib57044ba2014-09-16 15:58:01 -070024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
Thomas Vachuskae0f804a2014-10-27 23:40:48 -070030import org.onlab.onos.core.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070031import org.onlab.onos.event.AbstractListenerRegistry;
32import org.onlab.onos.event.EventDeliveryService;
33import org.onlab.onos.net.Device;
34import org.onlab.onos.net.DeviceId;
35import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070036import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070037import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070038import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070039import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070040import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070041import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070042import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070043import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070044import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070045import org.onlab.onos.net.flow.FlowRuleEvent;
46import org.onlab.onos.net.flow.FlowRuleListener;
47import org.onlab.onos.net.flow.FlowRuleProvider;
48import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
49import org.onlab.onos.net.flow.FlowRuleProviderService;
50import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070051import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070052import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070053import org.onlab.onos.net.provider.AbstractProviderRegistry;
54import org.onlab.onos.net.provider.AbstractProviderService;
55import org.slf4j.Logger;
56
Thomas Vachuska9b2da212014-11-10 19:30:25 -080057import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.CancellationException;
61import java.util.concurrent.ExecutionException;
62import java.util.concurrent.ExecutorService;
63import java.util.concurrent.Executors;
64import java.util.concurrent.Future;
65import java.util.concurrent.TimeUnit;
66import java.util.concurrent.TimeoutException;
67import java.util.concurrent.atomic.AtomicReference;
68
69import static com.google.common.base.Preconditions.checkNotNull;
70import static org.onlab.util.Tools.namedThreads;
71import static org.slf4j.LoggerFactory.getLogger;
alshabiba7f7ca82014-09-22 11:41:23 -070072
tome4729872014-09-23 00:37:37 -070073/**
74 * Provides implementation of the flow NB & SB APIs.
75 */
alshabib57044ba2014-09-16 15:58:01 -070076@Component(immediate = true)
77@Service
tom202175a2014-09-19 19:00:11 -070078public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070079 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
80 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070081
alshabib193525b2014-10-08 18:58:03 -070082 enum BatchState { STARTED, FINISHED, CANCELLED };
83
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070084 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070085 private final Logger log = getLogger(getClass());
86
87 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070088 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070089
alshabibbb42cad2014-09-25 11:43:05 -070090 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070091
Thomas Vachuska9b2da212014-11-10 19:30:25 -080092 private ExecutorService futureService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070093
tombe988312014-09-19 18:38:47 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070096
alshabib57044ba2014-09-16 15:58:01 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070098 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -070099
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700101 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700102
103 @Activate
104 public void activate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800105 futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
tomc78acee2014-09-24 15:16:55 -0700106 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700107 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
108 log.info("Started");
109 }
110
111 @Deactivate
112 public void deactivate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800113 futureService.shutdownNow();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700114
tomc78acee2014-09-24 15:16:55 -0700115 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700116 eventDispatcher.removeSink(FlowRuleEvent.class);
117 log.info("Stopped");
118 }
119
120 @Override
tom9b4030d2014-10-06 10:39:03 -0700121 public int getFlowRuleCount() {
122 return store.getFlowRuleCount();
123 }
124
125 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700126 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700127 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700128 }
129
130 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700131 public void applyFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700132 Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700133 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700134 toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700135 }
Madan Jampani6a456162014-10-24 11:36:17 -0700136 applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
alshabib57044ba2014-09-16 15:58:01 -0700137 }
138
139 @Override
140 public void removeFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700141 Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700142 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700143 toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700144 }
Madan Jampani6a456162014-10-24 11:36:17 -0700145 applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
alshabiba68eb962014-09-24 20:34:13 -0700146 }
alshabib57044ba2014-09-16 15:58:01 -0700147
alshabiba68eb962014-09-24 20:34:13 -0700148 @Override
149 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700150 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700151 }
152
153 @Override
154 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700155 Set<FlowRule> flowEntries = Sets.newHashSet();
156 for (Device d : deviceService.getDevices()) {
157 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
158 if (flowEntry.appId() == id.id()) {
159 flowEntries.add(flowEntry);
160 }
161 }
162 }
163 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700164 }
165
166 @Override
alshabib902d41b2014-10-07 16:52:05 -0700167 public Future<CompletedBatchOperation> applyBatch(
168 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700169 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700170 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700171 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700172 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
173 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700174 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700175 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700176
177 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700178 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700179 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
180 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700181 futures.add(future);
182 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700183 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700184 }
185
186 @Override
alshabib57044ba2014-09-16 15:58:01 -0700187 public void addListener(FlowRuleListener listener) {
188 listenerRegistry.addListener(listener);
189 }
190
191 @Override
192 public void removeListener(FlowRuleListener listener) {
193 listenerRegistry.removeListener(listener);
194 }
195
196 @Override
197 protected FlowRuleProviderService createProviderService(
198 FlowRuleProvider provider) {
199 return new InternalFlowRuleProviderService(provider);
200 }
201
202 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700203 extends AbstractProviderService<FlowRuleProvider>
204 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700205
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700206 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
207
alshabib57044ba2014-09-16 15:58:01 -0700208 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
209 super(provider);
210 }
211
212 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700213 public void flowRemoved(FlowEntry flowEntry) {
214 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700215 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700216 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700217 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700218 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700219 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700220 return;
221 }
alshabib1c319ff2014-10-04 20:29:09 -0700222 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700223 FlowRuleProvider frp = getProvider(device.providerId());
224 FlowRuleEvent event = null;
225 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700226 case ADDED:
227 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700228 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700229 break;
230 case PENDING_REMOVE:
231 case REMOVED:
232 event = store.removeFlowRule(stored);
233 break;
234 default:
235 break;
alshabib57044ba2014-09-16 15:58:01 -0700236
alshabiba68eb962014-09-24 20:34:13 -0700237 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700238 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700239 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700240 post(event);
241 }
alshabib57044ba2014-09-16 15:58:01 -0700242 }
243
alshabibba5ac482014-10-02 17:15:20 -0700244
alshabib1c319ff2014-10-04 20:29:09 -0700245 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700246 checkNotNull(flowRule, FLOW_RULE_NULL);
247 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700248 Device device = deviceService.getDevice(flowRule.deviceId());
249 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700250 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700251 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700252 case PENDING_REMOVE:
253 case REMOVED:
254 event = store.removeFlowRule(flowRule);
255 frp.removeFlowRule(flowRule);
256 break;
257 case ADDED:
258 case PENDING_ADD:
259 frp.applyFlowRule(flowRule);
260 break;
261 default:
262 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700263 }
264
alshabibbb42cad2014-09-25 11:43:05 -0700265 if (event != null) {
266 log.debug("Flow {} removed", flowRule);
267 post(event);
268 }
alshabib57044ba2014-09-16 15:58:01 -0700269
270 }
271
alshabibba5ac482014-10-02 17:15:20 -0700272
273 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700274 checkNotNull(flowRule, FLOW_RULE_NULL);
275 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700276 FlowRuleProvider frp = getProvider(flowRule.deviceId());
277 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700278 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700279 }
280
alshabibba5ac482014-10-02 17:15:20 -0700281
alshabib1c319ff2014-10-04 20:29:09 -0700282 private void flowAdded(FlowEntry flowEntry) {
283 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700284 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700285
alshabib1c319ff2014-10-04 20:29:09 -0700286 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700287
alshabib1c319ff2014-10-04 20:29:09 -0700288 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700289 if (event == null) {
290 log.debug("No flow store event generated.");
291 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700292 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700293 post(event);
294 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700295 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800296 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700297 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700298 }
alshabib219ebaa2014-09-22 15:41:24 -0700299
alshabib57044ba2014-09-16 15:58:01 -0700300 }
301
alshabib1c319ff2014-10-04 20:29:09 -0700302 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
303 if (storedRule == null) {
304 return false;
305 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700306 if (storedRule.isPermanent()) {
307 return true;
308 }
309
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700310 final long timeout = storedRule.timeout() * 1000;
311 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700312 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700313 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700314 return true;
315 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700316 if (!lastSeen.containsKey(storedRule)) {
317 // checking for the first time
318 lastSeen.put(storedRule, storedRule.lastSeen());
319 // Use following if lastSeen attr. was removed.
320 //lastSeen.put(storedRule, currentTime);
321 }
322 Long last = lastSeen.get(storedRule);
323 if (last == null) {
324 // concurrently removed? let the liveness check fail
325 return false;
326 }
alshabib85c41972014-10-03 13:48:39 -0700327
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700328 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700329 return true;
330 }
331 return false;
alshabibba5ac482014-10-02 17:15:20 -0700332 }
333
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700334 // Posts the specified event to the local event dispatcher.
335 private void post(FlowRuleEvent event) {
336 if (event != null) {
337 eventDispatcher.post(event);
338 }
339 }
alshabib5c370ff2014-09-18 10:12:14 -0700340
341 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700342 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
343 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700344
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700345 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700346 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700347 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700348 flowAdded(rule);
349 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700350 // the device has a rule the store does not have
351 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700352 }
353 }
alshabib1c319ff2014-10-04 20:29:09 -0700354 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700355 // there are rules in the store that aren't on the switch
356 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700357
alshabiba7f7ca82014-09-22 11:41:23 -0700358 }
alshabib5c370ff2014-09-18 10:12:14 -0700359 }
alshabib57044ba2014-09-16 15:58:01 -0700360 }
361
tomc78acee2014-09-24 15:16:55 -0700362 // Store delegate to re-post events emitted from the store.
363 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800364
365 private static final int TIMEOUT = 5000; // ms
366
Madan Jampani117aaae2014-10-23 10:04:05 -0700367 // TODO: Right now we only dispatch events at individual flowEntry level.
368 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700369 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700370 public void notify(FlowRuleBatchEvent event) {
371 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700372 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700373 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800374 // Request has been forwarded to MASTER Node, and was
375 for (FlowRule entry : request.toAdd()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700376 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
377 }
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800378 for (FlowRule entry : request.toRemove()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700379 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
380 }
381 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700382
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700384
Madan Jampani117aaae2014-10-23 10:04:05 -0700385 FlowRuleProvider flowRuleProvider =
386 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800387 final Future<CompletedBatchOperation> result =
Madan Jampani117aaae2014-10-23 10:04:05 -0700388 flowRuleProvider.executeBatch(batchOperation);
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800389 futureService.submit(new Runnable() {
Madan Jampani117aaae2014-10-23 10:04:05 -0700390 @Override
391 public void run() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800392 CompletedBatchOperation res = null;
393 try {
394 res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
395 } catch (TimeoutException | InterruptedException | ExecutionException e) {
396 log.warn("Something went wrong with the batch operation {}",
397 request.batchId());
398 }
399 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Madan Jampani117aaae2014-10-23 10:04:05 -0700400 }
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800401 });
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800402 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700403
Madan Jampani117aaae2014-10-23 10:04:05 -0700404 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800405 // MASTER Node has pushed the batch down to the Device
406
407 // Note: RULE_ADDED will be posted
408 // when Flow was actually confirmed by stats reply.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700409 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800410
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700411 default:
412 break;
413 }
tomc78acee2014-09-24 15:16:55 -0700414 }
415 }
alshabib902d41b2014-10-07 16:52:05 -0700416
Madan Jampani117aaae2014-10-23 10:04:05 -0700417 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700418
alshabib193525b2014-10-08 18:58:03 -0700419 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700420 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700421 private final AtomicReference<BatchState> state;
422 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700423
alshabib193525b2014-10-08 18:58:03 -0700424 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700425 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700426 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700427 this.batches = batches;
428 state = new AtomicReference<FlowRuleManager.BatchState>();
429 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700430 }
431
432 @Override
433 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700434 if (state.get() == BatchState.FINISHED) {
435 return false;
436 }
437 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
438 return false;
439 }
440 cleanUpBatch();
441 for (Future<CompletedBatchOperation> f : futures) {
442 f.cancel(mayInterruptIfRunning);
443 }
444 return true;
alshabib902d41b2014-10-07 16:52:05 -0700445 }
446
447 @Override
448 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700449 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700450 }
451
452 @Override
453 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700454 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700455 }
456
alshabib193525b2014-10-08 18:58:03 -0700457
alshabib902d41b2014-10-07 16:52:05 -0700458 @Override
459 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700460 ExecutionException {
461
462 if (isDone()) {
463 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700464 }
alshabib193525b2014-10-08 18:58:03 -0700465
466 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700467 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700468 CompletedBatchOperation completed;
469 for (Future<CompletedBatchOperation> future : futures) {
470 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700471 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700472 }
473
474 return finalizeBatchOperation(success, failed);
475
alshabib902d41b2014-10-07 16:52:05 -0700476 }
477
478 @Override
479 public CompletedBatchOperation get(long timeout, TimeUnit unit)
480 throws InterruptedException, ExecutionException,
481 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700482
483 if (isDone()) {
484 return overall;
485 }
486 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700487 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700488 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700489 long start = System.nanoTime();
490 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700491
492 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700493 long now = System.nanoTime();
494 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700495 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700496 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700497 }
alshabib193525b2014-10-08 18:58:03 -0700498 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700499 }
500
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700501 private boolean validateBatchOperation(Set<FlowRule> failed,
alshabib3effd042014-10-17 12:00:31 -0700502 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700503
504 if (isCancelled()) {
505 throw new CancellationException();
506 }
507 if (!completed.isSuccess()) {
508 failed.addAll(completed.failedItems());
509 cleanUpBatch();
510 cancelAllSubBatches();
511 return false;
512 }
513 return true;
514 }
515
516 private void cancelAllSubBatches() {
517 for (Future<CompletedBatchOperation> f : futures) {
518 f.cancel(true);
519 }
520 }
521
522 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700523 Set<FlowRule> failed) {
alshabib26834582014-10-08 20:15:46 -0700524 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700525 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
526 if (state.get() == BatchState.FINISHED) {
527 return overall;
528 }
529 throw new CancellationException();
530 }
531 overall = new CompletedBatchOperation(success, failed);
532 return overall;
533 }
534 }
535
536 private void cleanUpBatch() {
537 for (FlowRuleBatchEntry fbe : batches.values()) {
538 if (fbe.getOperator() == FlowRuleOperation.ADD ||
539 fbe.getOperator() == FlowRuleOperation.MODIFY) {
540 store.deleteFlowRule(fbe.getTarget());
541 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700542 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700543 store.storeFlowRule(fbe.getTarget());
544 }
545 }
alshabib193525b2014-10-08 18:58:03 -0700546 }
alshabib902d41b2014-10-07 16:52:05 -0700547 }
alshabib57044ba2014-09-16 15:58:01 -0700548}