blob: 2e53252c59f298d4a0ffaf6cd2ac4d57a826736c [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
tombe988312014-09-19 18:38:47 -070016package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
Thomas Vachuska9b2da212014-11-10 19:30:25 -080018import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080024
alshabib57044ba2014-09-16 15:58:01 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Thomas Vachuskae0f804a2014-10-27 23:40:48 -070031import org.onlab.onos.core.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070032import org.onlab.onos.event.AbstractListenerRegistry;
33import org.onlab.onos.event.EventDeliveryService;
34import org.onlab.onos.net.Device;
35import org.onlab.onos.net.DeviceId;
36import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070037import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070038import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070039import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070040import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070041import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070042import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070043import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070044import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070045import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070046import org.onlab.onos.net.flow.FlowRuleEvent;
47import org.onlab.onos.net.flow.FlowRuleListener;
48import org.onlab.onos.net.flow.FlowRuleProvider;
49import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
50import org.onlab.onos.net.flow.FlowRuleProviderService;
51import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070052import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070053import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070054import org.onlab.onos.net.provider.AbstractProviderRegistry;
55import org.onlab.onos.net.provider.AbstractProviderService;
56import org.slf4j.Logger;
57
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080058import java.util.HashSet;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080059import java.util.List;
60import java.util.Map;
61import java.util.Set;
62import java.util.concurrent.CancellationException;
63import java.util.concurrent.ExecutionException;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
66import java.util.concurrent.Future;
67import java.util.concurrent.TimeUnit;
68import java.util.concurrent.TimeoutException;
69import java.util.concurrent.atomic.AtomicReference;
70
71import static com.google.common.base.Preconditions.checkNotNull;
72import static org.onlab.util.Tools.namedThreads;
73import static org.slf4j.LoggerFactory.getLogger;
alshabiba7f7ca82014-09-22 11:41:23 -070074
tome4729872014-09-23 00:37:37 -070075/**
76 * Provides implementation of the flow NB & SB APIs.
77 */
alshabib57044ba2014-09-16 15:58:01 -070078@Component(immediate = true)
79@Service
tom202175a2014-09-19 19:00:11 -070080public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070081 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
82 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070083
alshabib193525b2014-10-08 18:58:03 -070084 enum BatchState { STARTED, FINISHED, CANCELLED };
85
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070086 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070087 private final Logger log = getLogger(getClass());
88
89 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070090 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070091
alshabibbb42cad2014-09-25 11:43:05 -070092 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070093
Thomas Vachuska9b2da212014-11-10 19:30:25 -080094 private ExecutorService futureService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070095
tombe988312014-09-19 18:38:47 -070096 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070098
alshabib57044ba2014-09-16 15:58:01 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700100 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -0700101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700103 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700104
105 @Activate
106 public void activate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800107 futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
tomc78acee2014-09-24 15:16:55 -0700108 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700109 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
110 log.info("Started");
111 }
112
113 @Deactivate
114 public void deactivate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800115 futureService.shutdownNow();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700116
tomc78acee2014-09-24 15:16:55 -0700117 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700118 eventDispatcher.removeSink(FlowRuleEvent.class);
119 log.info("Stopped");
120 }
121
122 @Override
tom9b4030d2014-10-06 10:39:03 -0700123 public int getFlowRuleCount() {
124 return store.getFlowRuleCount();
125 }
126
127 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700128 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700129 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700130 }
131
132 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700133 public void applyFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700134 Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700135 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700136 toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700137 }
Madan Jampani6a456162014-10-24 11:36:17 -0700138 applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
alshabib57044ba2014-09-16 15:58:01 -0700139 }
140
141 @Override
142 public void removeFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700143 Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700144 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700145 toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700146 }
Madan Jampani6a456162014-10-24 11:36:17 -0700147 applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
alshabiba68eb962014-09-24 20:34:13 -0700148 }
alshabib57044ba2014-09-16 15:58:01 -0700149
alshabiba68eb962014-09-24 20:34:13 -0700150 @Override
151 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700152 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700153 }
154
155 @Override
156 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700157 Set<FlowRule> flowEntries = Sets.newHashSet();
158 for (Device d : deviceService.getDevices()) {
159 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
160 if (flowEntry.appId() == id.id()) {
161 flowEntries.add(flowEntry);
162 }
163 }
164 }
165 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700166 }
167
168 @Override
alshabib902d41b2014-10-07 16:52:05 -0700169 public Future<CompletedBatchOperation> applyBatch(
170 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700171 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700172 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700173 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700174 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
175 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700176 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700177 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700178
179 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700180 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700181 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
182 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700183 futures.add(future);
184 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700185 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700186 }
187
188 @Override
alshabib57044ba2014-09-16 15:58:01 -0700189 public void addListener(FlowRuleListener listener) {
190 listenerRegistry.addListener(listener);
191 }
192
193 @Override
194 public void removeListener(FlowRuleListener listener) {
195 listenerRegistry.removeListener(listener);
196 }
197
198 @Override
199 protected FlowRuleProviderService createProviderService(
200 FlowRuleProvider provider) {
201 return new InternalFlowRuleProviderService(provider);
202 }
203
204 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700205 extends AbstractProviderService<FlowRuleProvider>
206 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700207
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700208 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
209
alshabib57044ba2014-09-16 15:58:01 -0700210 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
211 super(provider);
212 }
213
214 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700215 public void flowRemoved(FlowEntry flowEntry) {
216 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700217 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700218 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700219 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700220 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700221 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700222 return;
223 }
alshabib1c319ff2014-10-04 20:29:09 -0700224 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700225 FlowRuleProvider frp = getProvider(device.providerId());
226 FlowRuleEvent event = null;
227 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700228 case ADDED:
229 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700230 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700231 break;
232 case PENDING_REMOVE:
233 case REMOVED:
234 event = store.removeFlowRule(stored);
235 break;
236 default:
237 break;
alshabib57044ba2014-09-16 15:58:01 -0700238
alshabiba68eb962014-09-24 20:34:13 -0700239 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700240 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700241 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700242 post(event);
243 }
alshabib57044ba2014-09-16 15:58:01 -0700244 }
245
alshabibba5ac482014-10-02 17:15:20 -0700246
alshabib1c319ff2014-10-04 20:29:09 -0700247 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700248 checkNotNull(flowRule, FLOW_RULE_NULL);
249 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700250 Device device = deviceService.getDevice(flowRule.deviceId());
251 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700252 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700253 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700254 case PENDING_REMOVE:
255 case REMOVED:
256 event = store.removeFlowRule(flowRule);
257 frp.removeFlowRule(flowRule);
258 break;
259 case ADDED:
260 case PENDING_ADD:
261 frp.applyFlowRule(flowRule);
262 break;
263 default:
264 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700265 }
266
alshabibbb42cad2014-09-25 11:43:05 -0700267 if (event != null) {
268 log.debug("Flow {} removed", flowRule);
269 post(event);
270 }
alshabib57044ba2014-09-16 15:58:01 -0700271
272 }
273
alshabibba5ac482014-10-02 17:15:20 -0700274
275 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700276 checkNotNull(flowRule, FLOW_RULE_NULL);
277 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700278 FlowRuleProvider frp = getProvider(flowRule.deviceId());
279 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700280 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700281 }
282
alshabibba5ac482014-10-02 17:15:20 -0700283
alshabib1c319ff2014-10-04 20:29:09 -0700284 private void flowAdded(FlowEntry flowEntry) {
285 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700286 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700287
alshabib1c319ff2014-10-04 20:29:09 -0700288 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700289
alshabib1c319ff2014-10-04 20:29:09 -0700290 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700291 if (event == null) {
292 log.debug("No flow store event generated.");
293 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700294 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700295 post(event);
296 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700297 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800298 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700299 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700300 }
alshabib219ebaa2014-09-22 15:41:24 -0700301
alshabib57044ba2014-09-16 15:58:01 -0700302 }
303
alshabib1c319ff2014-10-04 20:29:09 -0700304 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
305 if (storedRule == null) {
306 return false;
307 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700308 if (storedRule.isPermanent()) {
309 return true;
310 }
311
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700312 final long timeout = storedRule.timeout() * 1000;
313 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700314 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700315 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700316 return true;
317 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700318 if (!lastSeen.containsKey(storedRule)) {
319 // checking for the first time
320 lastSeen.put(storedRule, storedRule.lastSeen());
321 // Use following if lastSeen attr. was removed.
322 //lastSeen.put(storedRule, currentTime);
323 }
324 Long last = lastSeen.get(storedRule);
325 if (last == null) {
326 // concurrently removed? let the liveness check fail
327 return false;
328 }
alshabib85c41972014-10-03 13:48:39 -0700329
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700330 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700331 return true;
332 }
333 return false;
alshabibba5ac482014-10-02 17:15:20 -0700334 }
335
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700336 // Posts the specified event to the local event dispatcher.
337 private void post(FlowRuleEvent event) {
338 if (event != null) {
339 eventDispatcher.post(event);
340 }
341 }
alshabib5c370ff2014-09-18 10:12:14 -0700342
343 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700344 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
345 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700346
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700347 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700348 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700349 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700350 flowAdded(rule);
351 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700352 // the device has a rule the store does not have
353 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700354 }
355 }
alshabib1c319ff2014-10-04 20:29:09 -0700356 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700357 // there are rules in the store that aren't on the switch
358 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700359
alshabiba7f7ca82014-09-22 11:41:23 -0700360 }
alshabib5c370ff2014-09-18 10:12:14 -0700361 }
alshabib57044ba2014-09-16 15:58:01 -0700362 }
363
tomc78acee2014-09-24 15:16:55 -0700364 // Store delegate to re-post events emitted from the store.
365 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800366
367 private static final int TIMEOUT = 5000; // ms
368
Madan Jampani117aaae2014-10-23 10:04:05 -0700369 // TODO: Right now we only dispatch events at individual flowEntry level.
370 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700371 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700372 public void notify(FlowRuleBatchEvent event) {
373 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700374 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700375 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800376 // Request has been forwarded to MASTER Node, and was
377 for (FlowRule entry : request.toAdd()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700378 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
379 }
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800380 for (FlowRule entry : request.toRemove()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700381 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
382 }
383 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700384
Madan Jampani117aaae2014-10-23 10:04:05 -0700385 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700386
Madan Jampani117aaae2014-10-23 10:04:05 -0700387 FlowRuleProvider flowRuleProvider =
388 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800389 final Future<CompletedBatchOperation> result =
Madan Jampani117aaae2014-10-23 10:04:05 -0700390 flowRuleProvider.executeBatch(batchOperation);
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800391 futureService.submit(new Runnable() {
Madan Jampani117aaae2014-10-23 10:04:05 -0700392 @Override
393 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800394 CompletedBatchOperation res;
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800395 try {
396 res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800397 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800398 } catch (TimeoutException | InterruptedException | ExecutionException e) {
399 log.warn("Something went wrong with the batch operation {}",
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800400 request.batchId(), e);
401
402 Set<FlowRule> failures = new HashSet<>(batchOperation.size());
403 for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
404 failures.add(op.getTarget());
405 }
406 res = new CompletedBatchOperation(false, failures);
407 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800408 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700409 }
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800410 });
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800411 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700412
Madan Jampani117aaae2014-10-23 10:04:05 -0700413 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800414 // MASTER Node has pushed the batch down to the Device
415
416 // Note: RULE_ADDED will be posted
417 // when Flow was actually confirmed by stats reply.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700418 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800419
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700420 default:
421 break;
422 }
tomc78acee2014-09-24 15:16:55 -0700423 }
424 }
alshabib902d41b2014-10-07 16:52:05 -0700425
Madan Jampani117aaae2014-10-23 10:04:05 -0700426 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700427
alshabib193525b2014-10-08 18:58:03 -0700428 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700429 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700430 private final AtomicReference<BatchState> state;
431 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700432
alshabib193525b2014-10-08 18:58:03 -0700433 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700434 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700435 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700436 this.batches = batches;
437 state = new AtomicReference<FlowRuleManager.BatchState>();
438 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700439 }
440
441 @Override
442 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700443 if (state.get() == BatchState.FINISHED) {
444 return false;
445 }
446 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
447 return false;
448 }
449 cleanUpBatch();
450 for (Future<CompletedBatchOperation> f : futures) {
451 f.cancel(mayInterruptIfRunning);
452 }
453 return true;
alshabib902d41b2014-10-07 16:52:05 -0700454 }
455
456 @Override
457 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700458 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700459 }
460
461 @Override
462 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700463 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700464 }
465
alshabib193525b2014-10-08 18:58:03 -0700466
alshabib902d41b2014-10-07 16:52:05 -0700467 @Override
468 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700469 ExecutionException {
470
471 if (isDone()) {
472 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700473 }
alshabib193525b2014-10-08 18:58:03 -0700474
475 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700476 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700477 CompletedBatchOperation completed;
478 for (Future<CompletedBatchOperation> future : futures) {
479 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700480 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700481 }
482
483 return finalizeBatchOperation(success, failed);
484
alshabib902d41b2014-10-07 16:52:05 -0700485 }
486
487 @Override
488 public CompletedBatchOperation get(long timeout, TimeUnit unit)
489 throws InterruptedException, ExecutionException,
490 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700491
492 if (isDone()) {
493 return overall;
494 }
495 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700496 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700497 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700498 long start = System.nanoTime();
499 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700500
501 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700502 long now = System.nanoTime();
503 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700504 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700505 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700506 }
alshabib193525b2014-10-08 18:58:03 -0700507 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700508 }
509
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700510 private boolean validateBatchOperation(Set<FlowRule> failed,
alshabib3effd042014-10-17 12:00:31 -0700511 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700512
513 if (isCancelled()) {
514 throw new CancellationException();
515 }
516 if (!completed.isSuccess()) {
517 failed.addAll(completed.failedItems());
518 cleanUpBatch();
519 cancelAllSubBatches();
520 return false;
521 }
522 return true;
523 }
524
525 private void cancelAllSubBatches() {
526 for (Future<CompletedBatchOperation> f : futures) {
527 f.cancel(true);
528 }
529 }
530
531 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700532 Set<FlowRule> failed) {
alshabib26834582014-10-08 20:15:46 -0700533 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700534 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
535 if (state.get() == BatchState.FINISHED) {
536 return overall;
537 }
538 throw new CancellationException();
539 }
540 overall = new CompletedBatchOperation(success, failed);
541 return overall;
542 }
543 }
544
545 private void cleanUpBatch() {
546 for (FlowRuleBatchEntry fbe : batches.values()) {
547 if (fbe.getOperator() == FlowRuleOperation.ADD ||
548 fbe.getOperator() == FlowRuleOperation.MODIFY) {
549 store.deleteFlowRule(fbe.getTarget());
550 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700551 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700552 store.storeFlowRule(fbe.getTarget());
553 }
554 }
alshabib193525b2014-10-08 18:58:03 -0700555 }
alshabib902d41b2014-10-07 16:52:05 -0700556 }
alshabib57044ba2014-09-16 15:58:01 -0700557}