blob: 6d6cd9a33c8e87502d03b7d2c6e480a8794619c8 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
Thomas Vachuska9b2da212014-11-10 19:30:25 -080018import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080024
alshabib57044ba2014-09-16 15:58:01 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.core.ApplicationId;
32import org.onosproject.event.AbstractListenerRegistry;
33import org.onosproject.event.EventDeliveryService;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.device.DeviceService;
37import org.onosproject.net.flow.CompletedBatchOperation;
38import org.onosproject.net.flow.DefaultFlowEntry;
39import org.onosproject.net.flow.FlowEntry;
40import org.onosproject.net.flow.FlowRule;
41import org.onosproject.net.flow.FlowRuleBatchEntry;
42import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
43import org.onosproject.net.flow.FlowRuleBatchEvent;
44import org.onosproject.net.flow.FlowRuleBatchOperation;
45import org.onosproject.net.flow.FlowRuleBatchRequest;
46import org.onosproject.net.flow.FlowRuleEvent;
47import org.onosproject.net.flow.FlowRuleListener;
48import org.onosproject.net.flow.FlowRuleProvider;
49import org.onosproject.net.flow.FlowRuleProviderRegistry;
50import org.onosproject.net.flow.FlowRuleProviderService;
51import org.onosproject.net.flow.FlowRuleService;
52import org.onosproject.net.flow.FlowRuleStore;
53import org.onosproject.net.flow.FlowRuleStoreDelegate;
54import org.onosproject.net.provider.AbstractProviderRegistry;
55import org.onosproject.net.provider.AbstractProviderService;
alshabib57044ba2014-09-16 15:58:01 -070056import org.slf4j.Logger;
57
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080058import java.util.HashSet;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080059import java.util.List;
60import java.util.Map;
61import java.util.Set;
62import java.util.concurrent.CancellationException;
63import java.util.concurrent.ExecutionException;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
66import java.util.concurrent.Future;
67import java.util.concurrent.TimeUnit;
68import java.util.concurrent.TimeoutException;
69import java.util.concurrent.atomic.AtomicReference;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080070import static com.google.common.base.Preconditions.checkNotNull;
71import static org.onlab.util.Tools.namedThreads;
72import static org.slf4j.LoggerFactory.getLogger;
alshabiba7f7ca82014-09-22 11:41:23 -070073
tome4729872014-09-23 00:37:37 -070074/**
75 * Provides implementation of the flow NB & SB APIs.
76 */
alshabib57044ba2014-09-16 15:58:01 -070077@Component(immediate = true)
78@Service
tom202175a2014-09-19 19:00:11 -070079public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070080 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
81 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070082
Sho SHIMIZU183b12fd2015-01-20 14:56:39 -080083 enum BatchState { STARTED, FINISHED, CANCELLED }
alshabib193525b2014-10-08 18:58:03 -070084
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070085 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070086 private final Logger log = getLogger(getClass());
87
88 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070089 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070090
alshabibbb42cad2014-09-25 11:43:05 -070091 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070092
Thomas Vachuska9b2da212014-11-10 19:30:25 -080093 private ExecutorService futureService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094
tombe988312014-09-19 18:38:47 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070097
alshabib57044ba2014-09-16 15:58:01 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070099 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -0700100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700102 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700103
104 @Activate
105 public void activate() {
Pavlin Radoslavov369c6432014-12-03 16:25:14 -0800106 futureService =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800107 Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
tomc78acee2014-09-24 15:16:55 -0700108 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700109 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
110 log.info("Started");
111 }
112
113 @Deactivate
114 public void deactivate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800115 futureService.shutdownNow();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700116
tomc78acee2014-09-24 15:16:55 -0700117 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700118 eventDispatcher.removeSink(FlowRuleEvent.class);
119 log.info("Stopped");
120 }
121
122 @Override
tom9b4030d2014-10-06 10:39:03 -0700123 public int getFlowRuleCount() {
124 return store.getFlowRuleCount();
125 }
126
127 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700128 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700129 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700130 }
131
132 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700133 public void applyFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700134 Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700135 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700136 toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700137 }
Madan Jampani6a456162014-10-24 11:36:17 -0700138 applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
alshabib57044ba2014-09-16 15:58:01 -0700139 }
140
141 @Override
142 public void removeFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700143 Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700144 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700145 toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700146 }
Madan Jampani6a456162014-10-24 11:36:17 -0700147 applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
alshabiba68eb962014-09-24 20:34:13 -0700148 }
alshabib57044ba2014-09-16 15:58:01 -0700149
alshabiba68eb962014-09-24 20:34:13 -0700150 @Override
151 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700152 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700153 }
154
155 @Override
156 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700157 Set<FlowRule> flowEntries = Sets.newHashSet();
158 for (Device d : deviceService.getDevices()) {
159 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
160 if (flowEntry.appId() == id.id()) {
161 flowEntries.add(flowEntry);
162 }
163 }
164 }
165 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700166 }
167
168 @Override
alshabibaa7e7de2014-11-12 19:20:44 -0800169 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
170 Set<FlowRule> matches = Sets.newHashSet();
171 long toLookUp = ((long) appId.id() << 16) | groupId;
172 for (Device d : deviceService.getDevices()) {
173 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
174 if ((flowEntry.id().value() >>> 32) == toLookUp) {
175 matches.add(flowEntry);
176 }
177 }
178 }
179 return matches;
180 }
181
182 @Override
alshabib902d41b2014-10-07 16:52:05 -0700183 public Future<CompletedBatchOperation> applyBatch(
184 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700185 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700186 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700187 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700188 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
Sho SHIMIZUaba9d002015-01-29 14:51:04 -0800189 final FlowRule f = fbe.target();
Madan Jampani117aaae2014-10-23 10:04:05 -0700190 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700191 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700192
193 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700194 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700195 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
196 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700197 futures.add(future);
198 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700199 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700200 }
201
202 @Override
alshabib57044ba2014-09-16 15:58:01 -0700203 public void addListener(FlowRuleListener listener) {
204 listenerRegistry.addListener(listener);
205 }
206
207 @Override
208 public void removeListener(FlowRuleListener listener) {
209 listenerRegistry.removeListener(listener);
210 }
211
212 @Override
213 protected FlowRuleProviderService createProviderService(
214 FlowRuleProvider provider) {
215 return new InternalFlowRuleProviderService(provider);
216 }
217
218 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700219 extends AbstractProviderService<FlowRuleProvider>
220 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700221
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700222 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
223
alshabib57044ba2014-09-16 15:58:01 -0700224 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
225 super(provider);
226 }
227
228 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700229 public void flowRemoved(FlowEntry flowEntry) {
230 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700231 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700232 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700233 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700234 if (stored == null) {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800235 log.debug("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700236 return;
237 }
alshabib1c319ff2014-10-04 20:29:09 -0700238 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700239 FlowRuleProvider frp = getProvider(device.providerId());
240 FlowRuleEvent event = null;
241 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700242 case ADDED:
243 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700244 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700245 break;
246 case PENDING_REMOVE:
247 case REMOVED:
248 event = store.removeFlowRule(stored);
249 break;
250 default:
251 break;
alshabib57044ba2014-09-16 15:58:01 -0700252
alshabiba68eb962014-09-24 20:34:13 -0700253 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700254 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700255 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700256 post(event);
257 }
alshabib57044ba2014-09-16 15:58:01 -0700258 }
259
alshabibba5ac482014-10-02 17:15:20 -0700260
alshabib1c319ff2014-10-04 20:29:09 -0700261 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700262 checkNotNull(flowRule, FLOW_RULE_NULL);
263 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700264 Device device = deviceService.getDevice(flowRule.deviceId());
265 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700266 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700267 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700268 case PENDING_REMOVE:
269 case REMOVED:
270 event = store.removeFlowRule(flowRule);
271 frp.removeFlowRule(flowRule);
272 break;
273 case ADDED:
274 case PENDING_ADD:
275 frp.applyFlowRule(flowRule);
276 break;
277 default:
278 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700279 }
280
alshabibbb42cad2014-09-25 11:43:05 -0700281 if (event != null) {
282 log.debug("Flow {} removed", flowRule);
283 post(event);
284 }
alshabib57044ba2014-09-16 15:58:01 -0700285
286 }
287
alshabibba5ac482014-10-02 17:15:20 -0700288
289 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700290 checkNotNull(flowRule, FLOW_RULE_NULL);
291 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700292 FlowRuleProvider frp = getProvider(flowRule.deviceId());
293 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700294 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700295 }
296
alshabibba5ac482014-10-02 17:15:20 -0700297
alshabib1c319ff2014-10-04 20:29:09 -0700298 private void flowAdded(FlowEntry flowEntry) {
299 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700300 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700301
alshabib1c319ff2014-10-04 20:29:09 -0700302 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700303
alshabib1c319ff2014-10-04 20:29:09 -0700304 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700305 if (event == null) {
306 log.debug("No flow store event generated.");
307 } else {
Jonathan Hart58682dd2014-11-24 20:11:16 -0800308 log.trace("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700309 post(event);
310 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700311 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800312 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700313 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700314 }
alshabib219ebaa2014-09-22 15:41:24 -0700315
alshabib57044ba2014-09-16 15:58:01 -0700316 }
317
alshabib1c319ff2014-10-04 20:29:09 -0700318 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
319 if (storedRule == null) {
320 return false;
321 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700322 if (storedRule.isPermanent()) {
323 return true;
324 }
325
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700326 final long timeout = storedRule.timeout() * 1000;
327 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700328 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700329 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700330 return true;
331 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700332 if (!lastSeen.containsKey(storedRule)) {
333 // checking for the first time
334 lastSeen.put(storedRule, storedRule.lastSeen());
335 // Use following if lastSeen attr. was removed.
336 //lastSeen.put(storedRule, currentTime);
337 }
338 Long last = lastSeen.get(storedRule);
339 if (last == null) {
340 // concurrently removed? let the liveness check fail
341 return false;
342 }
alshabib85c41972014-10-03 13:48:39 -0700343
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700344 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700345 return true;
346 }
347 return false;
alshabibba5ac482014-10-02 17:15:20 -0700348 }
349
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700350 // Posts the specified event to the local event dispatcher.
351 private void post(FlowRuleEvent event) {
352 if (event != null) {
353 eventDispatcher.post(event);
354 }
355 }
alshabib5c370ff2014-09-18 10:12:14 -0700356
357 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700358 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
alshabib64def642014-12-02 23:27:37 -0800359 Set<FlowEntry> storedRules = Sets.newHashSet(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700360
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700361 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700362 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700363 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700364 flowAdded(rule);
365 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700366 // the device has a rule the store does not have
367 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700368 }
369 }
alshabib1c319ff2014-10-04 20:29:09 -0700370 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700371 // there are rules in the store that aren't on the switch
372 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700373
alshabiba7f7ca82014-09-22 11:41:23 -0700374 }
alshabib5c370ff2014-09-18 10:12:14 -0700375 }
alshabib57044ba2014-09-16 15:58:01 -0700376 }
377
tomc78acee2014-09-24 15:16:55 -0700378 // Store delegate to re-post events emitted from the store.
379 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800380
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800381 // FIXME set appropriate default and make it configurable
382 private static final int TIMEOUT_PER_OP = 500; // ms
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800383
Madan Jampani117aaae2014-10-23 10:04:05 -0700384 // TODO: Right now we only dispatch events at individual flowEntry level.
385 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700386 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700387 public void notify(FlowRuleBatchEvent event) {
388 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700389 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700390 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800391 // Request has been forwarded to MASTER Node, and was
392 for (FlowRule entry : request.toAdd()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700393 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
394 }
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800395 for (FlowRule entry : request.toRemove()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700396 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
397 }
398 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700399
Madan Jampani117aaae2014-10-23 10:04:05 -0700400 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700401
Madan Jampani117aaae2014-10-23 10:04:05 -0700402 FlowRuleProvider flowRuleProvider =
Sho SHIMIZUaba9d002015-01-29 14:51:04 -0800403 getProvider(batchOperation.getOperations().get(0).target().deviceId());
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800404 final Future<CompletedBatchOperation> result =
Madan Jampani117aaae2014-10-23 10:04:05 -0700405 flowRuleProvider.executeBatch(batchOperation);
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800406 futureService.submit(new Runnable() {
Madan Jampani117aaae2014-10-23 10:04:05 -0700407 @Override
408 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800409 CompletedBatchOperation res;
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800410 try {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800411 res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800412 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800413 } catch (TimeoutException | InterruptedException | ExecutionException e) {
414 log.warn("Something went wrong with the batch operation {}",
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800415 request.batchId(), e);
416
417 Set<FlowRule> failures = new HashSet<>(batchOperation.size());
418 for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
Sho SHIMIZUaba9d002015-01-29 14:51:04 -0800419 failures.add(op.target());
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800420 }
421 res = new CompletedBatchOperation(false, failures);
422 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800423 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700424 }
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800425 });
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800426 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700427
Madan Jampani117aaae2014-10-23 10:04:05 -0700428 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800429 // MASTER Node has pushed the batch down to the Device
430
431 // Note: RULE_ADDED will be posted
432 // when Flow was actually confirmed by stats reply.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700433 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800434
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700435 default:
436 break;
437 }
tomc78acee2014-09-24 15:16:55 -0700438 }
439 }
alshabib902d41b2014-10-07 16:52:05 -0700440
Madan Jampani117aaae2014-10-23 10:04:05 -0700441 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700442
alshabib193525b2014-10-08 18:58:03 -0700443 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700444 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700445 private final AtomicReference<BatchState> state;
446 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700447
alshabib193525b2014-10-08 18:58:03 -0700448 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700449 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700450 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700451 this.batches = batches;
Sho SHIMIZUd58c5b72015-01-20 19:25:03 -0800452 this.state = new AtomicReference<>(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700453 }
454
455 @Override
456 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700457 if (state.get() == BatchState.FINISHED) {
458 return false;
459 }
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800460 if (log.isDebugEnabled()) {
461 log.debug("Cancelling FlowRuleBatchFuture",
462 new RuntimeException("Just printing backtrace"));
463 }
alshabib193525b2014-10-08 18:58:03 -0700464 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
465 return false;
466 }
467 cleanUpBatch();
468 for (Future<CompletedBatchOperation> f : futures) {
469 f.cancel(mayInterruptIfRunning);
470 }
471 return true;
alshabib902d41b2014-10-07 16:52:05 -0700472 }
473
474 @Override
475 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700476 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700477 }
478
479 @Override
480 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700481 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700482 }
483
alshabib193525b2014-10-08 18:58:03 -0700484
alshabib902d41b2014-10-07 16:52:05 -0700485 @Override
486 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700487 ExecutionException {
488
489 if (isDone()) {
490 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700491 }
alshabib193525b2014-10-08 18:58:03 -0700492
493 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700494 Set<FlowRule> failed = Sets.newHashSet();
Brian O'Connor427a1762014-11-19 18:40:32 -0800495 Set<Long> failedIds = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700496 CompletedBatchOperation completed;
497 for (Future<CompletedBatchOperation> future : futures) {
498 completed = future.get();
Brian O'Connor427a1762014-11-19 18:40:32 -0800499 success = validateBatchOperation(failed, failedIds, completed);
alshabib193525b2014-10-08 18:58:03 -0700500 }
501
Brian O'Connor427a1762014-11-19 18:40:32 -0800502 return finalizeBatchOperation(success, failed, failedIds);
alshabib193525b2014-10-08 18:58:03 -0700503
alshabib902d41b2014-10-07 16:52:05 -0700504 }
505
506 @Override
507 public CompletedBatchOperation get(long timeout, TimeUnit unit)
508 throws InterruptedException, ExecutionException,
509 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700510
511 if (isDone()) {
512 return overall;
513 }
514 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700515 Set<FlowRule> failed = Sets.newHashSet();
Brian O'Connor427a1762014-11-19 18:40:32 -0800516 Set<Long> failedIds = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700517 CompletedBatchOperation completed;
alshabib193525b2014-10-08 18:58:03 -0700518 for (Future<CompletedBatchOperation> future : futures) {
Brian O'Connorfa81eae2014-10-30 13:20:05 -0700519 completed = future.get(timeout, unit);
Brian O'Connor427a1762014-11-19 18:40:32 -0800520 success = validateBatchOperation(failed, failedIds, completed);
alshabib902d41b2014-10-07 16:52:05 -0700521 }
Brian O'Connor427a1762014-11-19 18:40:32 -0800522 return finalizeBatchOperation(success, failed, failedIds);
alshabib902d41b2014-10-07 16:52:05 -0700523 }
524
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700525 private boolean validateBatchOperation(Set<FlowRule> failed,
Brian O'Connor427a1762014-11-19 18:40:32 -0800526 Set<Long> failedIds,
527 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700528
529 if (isCancelled()) {
530 throw new CancellationException();
531 }
532 if (!completed.isSuccess()) {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800533 log.warn("FlowRuleBatch failed: {}", completed);
alshabib193525b2014-10-08 18:58:03 -0700534 failed.addAll(completed.failedItems());
Brian O'Connor427a1762014-11-19 18:40:32 -0800535 failedIds.addAll(completed.failedIds());
alshabib193525b2014-10-08 18:58:03 -0700536 cleanUpBatch();
537 cancelAllSubBatches();
538 return false;
539 }
540 return true;
541 }
542
543 private void cancelAllSubBatches() {
544 for (Future<CompletedBatchOperation> f : futures) {
545 f.cancel(true);
546 }
547 }
548
549 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Brian O'Connor427a1762014-11-19 18:40:32 -0800550 Set<FlowRule> failed,
551 Set<Long> failedIds) {
alshabib26834582014-10-08 20:15:46 -0700552 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700553 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
554 if (state.get() == BatchState.FINISHED) {
555 return overall;
556 }
557 throw new CancellationException();
558 }
Brian O'Connor427a1762014-11-19 18:40:32 -0800559 overall = new CompletedBatchOperation(success, failed, failedIds);
alshabib193525b2014-10-08 18:58:03 -0700560 return overall;
561 }
562 }
563
564 private void cleanUpBatch() {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800565 log.debug("cleaning up batch");
566 // TODO convert these into a batch?
alshabib193525b2014-10-08 18:58:03 -0700567 for (FlowRuleBatchEntry fbe : batches.values()) {
Sho SHIMIZUaba9d002015-01-29 14:51:04 -0800568 if (fbe.operator() == FlowRuleOperation.ADD ||
569 fbe.operator() == FlowRuleOperation.MODIFY) {
570 store.deleteFlowRule(fbe.target());
571 } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
572 store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
573 store.storeFlowRule(fbe.target());
alshabib193525b2014-10-08 18:58:03 -0700574 }
575 }
alshabib193525b2014-10-08 18:58:03 -0700576 }
alshabib902d41b2014-10-07 16:52:05 -0700577 }
alshabib57044ba2014-09-16 15:58:01 -0700578}