blob: c1ba8af55f48e92f1748fc7f558c293ff9d61c4f [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
tombe988312014-09-19 18:38:47 -070016package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
Thomas Vachuska9b2da212014-11-10 19:30:25 -080018import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080024
alshabib57044ba2014-09-16 15:58:01 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Thomas Vachuskae0f804a2014-10-27 23:40:48 -070031import org.onlab.onos.core.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070032import org.onlab.onos.event.AbstractListenerRegistry;
33import org.onlab.onos.event.EventDeliveryService;
34import org.onlab.onos.net.Device;
35import org.onlab.onos.net.DeviceId;
36import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070037import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070038import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070039import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070040import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070041import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070042import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070043import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070044import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070045import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070046import org.onlab.onos.net.flow.FlowRuleEvent;
47import org.onlab.onos.net.flow.FlowRuleListener;
48import org.onlab.onos.net.flow.FlowRuleProvider;
49import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
50import org.onlab.onos.net.flow.FlowRuleProviderService;
51import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070052import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070053import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070054import org.onlab.onos.net.provider.AbstractProviderRegistry;
55import org.onlab.onos.net.provider.AbstractProviderService;
56import org.slf4j.Logger;
57
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080058import java.util.HashSet;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080059import java.util.List;
60import java.util.Map;
61import java.util.Set;
62import java.util.concurrent.CancellationException;
63import java.util.concurrent.ExecutionException;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
66import java.util.concurrent.Future;
67import java.util.concurrent.TimeUnit;
68import java.util.concurrent.TimeoutException;
69import java.util.concurrent.atomic.AtomicReference;
70
71import static com.google.common.base.Preconditions.checkNotNull;
72import static org.onlab.util.Tools.namedThreads;
73import static org.slf4j.LoggerFactory.getLogger;
alshabiba7f7ca82014-09-22 11:41:23 -070074
tome4729872014-09-23 00:37:37 -070075/**
76 * Provides implementation of the flow NB & SB APIs.
77 */
alshabib57044ba2014-09-16 15:58:01 -070078@Component(immediate = true)
79@Service
tom202175a2014-09-19 19:00:11 -070080public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070081 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
82 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070083
alshabib193525b2014-10-08 18:58:03 -070084 enum BatchState { STARTED, FINISHED, CANCELLED };
85
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070086 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070087 private final Logger log = getLogger(getClass());
88
89 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070090 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070091
alshabibbb42cad2014-09-25 11:43:05 -070092 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070093
Thomas Vachuska9b2da212014-11-10 19:30:25 -080094 private ExecutorService futureService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070095
tombe988312014-09-19 18:38:47 -070096 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070098
alshabib57044ba2014-09-16 15:58:01 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700100 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -0700101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700103 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700104
105 @Activate
106 public void activate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800107 futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
tomc78acee2014-09-24 15:16:55 -0700108 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700109 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
110 log.info("Started");
111 }
112
113 @Deactivate
114 public void deactivate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800115 futureService.shutdownNow();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700116
tomc78acee2014-09-24 15:16:55 -0700117 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700118 eventDispatcher.removeSink(FlowRuleEvent.class);
119 log.info("Stopped");
120 }
121
122 @Override
tom9b4030d2014-10-06 10:39:03 -0700123 public int getFlowRuleCount() {
124 return store.getFlowRuleCount();
125 }
126
127 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700128 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700129 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700130 }
131
132 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700133 public void applyFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700134 Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700135 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700136 toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700137 }
Madan Jampani6a456162014-10-24 11:36:17 -0700138 applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
alshabib57044ba2014-09-16 15:58:01 -0700139 }
140
141 @Override
142 public void removeFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700143 Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700144 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700145 toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700146 }
Madan Jampani6a456162014-10-24 11:36:17 -0700147 applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
alshabiba68eb962014-09-24 20:34:13 -0700148 }
alshabib57044ba2014-09-16 15:58:01 -0700149
alshabiba68eb962014-09-24 20:34:13 -0700150 @Override
151 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700152 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700153 }
154
155 @Override
156 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700157 Set<FlowRule> flowEntries = Sets.newHashSet();
158 for (Device d : deviceService.getDevices()) {
159 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
160 if (flowEntry.appId() == id.id()) {
161 flowEntries.add(flowEntry);
162 }
163 }
164 }
165 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700166 }
167
168 @Override
alshabibaa7e7de2014-11-12 19:20:44 -0800169 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
170 Set<FlowRule> matches = Sets.newHashSet();
171 long toLookUp = ((long) appId.id() << 16) | groupId;
172 for (Device d : deviceService.getDevices()) {
173 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
174 if ((flowEntry.id().value() >>> 32) == toLookUp) {
175 matches.add(flowEntry);
176 }
177 }
178 }
179 return matches;
180 }
181
182 @Override
alshabib902d41b2014-10-07 16:52:05 -0700183 public Future<CompletedBatchOperation> applyBatch(
184 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700185 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700186 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700187 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700188 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
189 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700190 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700191 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700192
193 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700194 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700195 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
196 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700197 futures.add(future);
198 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700199 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700200 }
201
202 @Override
alshabib57044ba2014-09-16 15:58:01 -0700203 public void addListener(FlowRuleListener listener) {
204 listenerRegistry.addListener(listener);
205 }
206
207 @Override
208 public void removeListener(FlowRuleListener listener) {
209 listenerRegistry.removeListener(listener);
210 }
211
212 @Override
213 protected FlowRuleProviderService createProviderService(
214 FlowRuleProvider provider) {
215 return new InternalFlowRuleProviderService(provider);
216 }
217
218 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700219 extends AbstractProviderService<FlowRuleProvider>
220 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700221
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700222 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
223
alshabib57044ba2014-09-16 15:58:01 -0700224 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
225 super(provider);
226 }
227
228 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700229 public void flowRemoved(FlowEntry flowEntry) {
230 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700231 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700232 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700233 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700234 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700235 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700236 return;
237 }
alshabib1c319ff2014-10-04 20:29:09 -0700238 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700239 FlowRuleProvider frp = getProvider(device.providerId());
240 FlowRuleEvent event = null;
241 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700242 case ADDED:
243 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700244 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700245 break;
246 case PENDING_REMOVE:
247 case REMOVED:
248 event = store.removeFlowRule(stored);
249 break;
250 default:
251 break;
alshabib57044ba2014-09-16 15:58:01 -0700252
alshabiba68eb962014-09-24 20:34:13 -0700253 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700254 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700255 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700256 post(event);
257 }
alshabib57044ba2014-09-16 15:58:01 -0700258 }
259
alshabibba5ac482014-10-02 17:15:20 -0700260
alshabib1c319ff2014-10-04 20:29:09 -0700261 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700262 checkNotNull(flowRule, FLOW_RULE_NULL);
263 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700264 Device device = deviceService.getDevice(flowRule.deviceId());
265 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700266 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700267 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700268 case PENDING_REMOVE:
269 case REMOVED:
270 event = store.removeFlowRule(flowRule);
271 frp.removeFlowRule(flowRule);
272 break;
273 case ADDED:
274 case PENDING_ADD:
275 frp.applyFlowRule(flowRule);
276 break;
277 default:
278 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700279 }
280
alshabibbb42cad2014-09-25 11:43:05 -0700281 if (event != null) {
282 log.debug("Flow {} removed", flowRule);
283 post(event);
284 }
alshabib57044ba2014-09-16 15:58:01 -0700285
286 }
287
alshabibba5ac482014-10-02 17:15:20 -0700288
289 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700290 checkNotNull(flowRule, FLOW_RULE_NULL);
291 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700292 FlowRuleProvider frp = getProvider(flowRule.deviceId());
293 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700294 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700295 }
296
alshabibba5ac482014-10-02 17:15:20 -0700297
alshabib1c319ff2014-10-04 20:29:09 -0700298 private void flowAdded(FlowEntry flowEntry) {
299 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700300 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700301
alshabib1c319ff2014-10-04 20:29:09 -0700302 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700303
alshabib1c319ff2014-10-04 20:29:09 -0700304 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700305 if (event == null) {
306 log.debug("No flow store event generated.");
307 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700308 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700309 post(event);
310 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700311 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800312 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700313 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700314 }
alshabib219ebaa2014-09-22 15:41:24 -0700315
alshabib57044ba2014-09-16 15:58:01 -0700316 }
317
alshabib1c319ff2014-10-04 20:29:09 -0700318 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
319 if (storedRule == null) {
320 return false;
321 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700322 if (storedRule.isPermanent()) {
323 return true;
324 }
325
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700326 final long timeout = storedRule.timeout() * 1000;
327 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700328 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700329 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700330 return true;
331 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700332 if (!lastSeen.containsKey(storedRule)) {
333 // checking for the first time
334 lastSeen.put(storedRule, storedRule.lastSeen());
335 // Use following if lastSeen attr. was removed.
336 //lastSeen.put(storedRule, currentTime);
337 }
338 Long last = lastSeen.get(storedRule);
339 if (last == null) {
340 // concurrently removed? let the liveness check fail
341 return false;
342 }
alshabib85c41972014-10-03 13:48:39 -0700343
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700344 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700345 return true;
346 }
347 return false;
alshabibba5ac482014-10-02 17:15:20 -0700348 }
349
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700350 // Posts the specified event to the local event dispatcher.
351 private void post(FlowRuleEvent event) {
352 if (event != null) {
353 eventDispatcher.post(event);
354 }
355 }
alshabib5c370ff2014-09-18 10:12:14 -0700356
357 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700358 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
359 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700360
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700361 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700362 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700363 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700364 flowAdded(rule);
365 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700366 // the device has a rule the store does not have
367 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700368 }
369 }
alshabib1c319ff2014-10-04 20:29:09 -0700370 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700371 // there are rules in the store that aren't on the switch
372 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700373
alshabiba7f7ca82014-09-22 11:41:23 -0700374 }
alshabib5c370ff2014-09-18 10:12:14 -0700375 }
alshabib57044ba2014-09-16 15:58:01 -0700376 }
377
tomc78acee2014-09-24 15:16:55 -0700378 // Store delegate to re-post events emitted from the store.
379 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800380
381 private static final int TIMEOUT = 5000; // ms
382
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 // TODO: Right now we only dispatch events at individual flowEntry level.
384 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700385 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700386 public void notify(FlowRuleBatchEvent event) {
387 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700388 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700389 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800390 // Request has been forwarded to MASTER Node, and was
391 for (FlowRule entry : request.toAdd()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700392 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
393 }
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800394 for (FlowRule entry : request.toRemove()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700395 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
396 }
397 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700398
Madan Jampani117aaae2014-10-23 10:04:05 -0700399 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700400
Madan Jampani117aaae2014-10-23 10:04:05 -0700401 FlowRuleProvider flowRuleProvider =
402 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800403 final Future<CompletedBatchOperation> result =
Madan Jampani117aaae2014-10-23 10:04:05 -0700404 flowRuleProvider.executeBatch(batchOperation);
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800405 futureService.submit(new Runnable() {
Madan Jampani117aaae2014-10-23 10:04:05 -0700406 @Override
407 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800408 CompletedBatchOperation res;
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800409 try {
410 res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800411 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800412 } catch (TimeoutException | InterruptedException | ExecutionException e) {
413 log.warn("Something went wrong with the batch operation {}",
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800414 request.batchId(), e);
415
416 Set<FlowRule> failures = new HashSet<>(batchOperation.size());
417 for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
418 failures.add(op.getTarget());
419 }
420 res = new CompletedBatchOperation(false, failures);
421 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800422 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700423 }
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800424 });
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800425 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700426
Madan Jampani117aaae2014-10-23 10:04:05 -0700427 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800428 // MASTER Node has pushed the batch down to the Device
429
430 // Note: RULE_ADDED will be posted
431 // when Flow was actually confirmed by stats reply.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700432 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800433
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700434 default:
435 break;
436 }
tomc78acee2014-09-24 15:16:55 -0700437 }
438 }
alshabib902d41b2014-10-07 16:52:05 -0700439
Madan Jampani117aaae2014-10-23 10:04:05 -0700440 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700441
alshabib193525b2014-10-08 18:58:03 -0700442 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700443 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700444 private final AtomicReference<BatchState> state;
445 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700446
alshabib193525b2014-10-08 18:58:03 -0700447 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700448 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700449 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700450 this.batches = batches;
451 state = new AtomicReference<FlowRuleManager.BatchState>();
452 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700453 }
454
455 @Override
456 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700457 if (state.get() == BatchState.FINISHED) {
458 return false;
459 }
460 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
461 return false;
462 }
463 cleanUpBatch();
464 for (Future<CompletedBatchOperation> f : futures) {
465 f.cancel(mayInterruptIfRunning);
466 }
467 return true;
alshabib902d41b2014-10-07 16:52:05 -0700468 }
469
470 @Override
471 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700472 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700473 }
474
475 @Override
476 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700477 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700478 }
479
alshabib193525b2014-10-08 18:58:03 -0700480
alshabib902d41b2014-10-07 16:52:05 -0700481 @Override
482 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700483 ExecutionException {
484
485 if (isDone()) {
486 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700487 }
alshabib193525b2014-10-08 18:58:03 -0700488
489 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700490 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700491 CompletedBatchOperation completed;
492 for (Future<CompletedBatchOperation> future : futures) {
493 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700494 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700495 }
496
497 return finalizeBatchOperation(success, failed);
498
alshabib902d41b2014-10-07 16:52:05 -0700499 }
500
501 @Override
502 public CompletedBatchOperation get(long timeout, TimeUnit unit)
503 throws InterruptedException, ExecutionException,
504 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700505
506 if (isDone()) {
507 return overall;
508 }
509 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700510 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700511 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700512 long start = System.nanoTime();
513 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700514
515 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700516 long now = System.nanoTime();
517 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700518 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700519 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700520 }
alshabib193525b2014-10-08 18:58:03 -0700521 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700522 }
523
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700524 private boolean validateBatchOperation(Set<FlowRule> failed,
alshabib3effd042014-10-17 12:00:31 -0700525 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700526
527 if (isCancelled()) {
528 throw new CancellationException();
529 }
530 if (!completed.isSuccess()) {
531 failed.addAll(completed.failedItems());
532 cleanUpBatch();
533 cancelAllSubBatches();
534 return false;
535 }
536 return true;
537 }
538
539 private void cancelAllSubBatches() {
540 for (Future<CompletedBatchOperation> f : futures) {
541 f.cancel(true);
542 }
543 }
544
545 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700546 Set<FlowRule> failed) {
alshabib26834582014-10-08 20:15:46 -0700547 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700548 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
549 if (state.get() == BatchState.FINISHED) {
550 return overall;
551 }
552 throw new CancellationException();
553 }
554 overall = new CompletedBatchOperation(success, failed);
555 return overall;
556 }
557 }
558
559 private void cleanUpBatch() {
560 for (FlowRuleBatchEntry fbe : batches.values()) {
561 if (fbe.getOperator() == FlowRuleOperation.ADD ||
562 fbe.getOperator() == FlowRuleOperation.MODIFY) {
563 store.deleteFlowRule(fbe.getTarget());
564 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700565 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700566 store.storeFlowRule(fbe.getTarget());
567 }
568 }
alshabib193525b2014-10-08 18:58:03 -0700569 }
alshabib902d41b2014-10-07 16:52:05 -0700570 }
alshabib57044ba2014-09-16 15:58:01 -0700571}