blob: 3f37c29df60a3ef8cf6a4573c50750f8fd66608e [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
tombe988312014-09-19 18:38:47 -070016package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
Thomas Vachuska9b2da212014-11-10 19:30:25 -080018import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080024
alshabib57044ba2014-09-16 15:58:01 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Thomas Vachuskae0f804a2014-10-27 23:40:48 -070031import org.onlab.onos.core.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070032import org.onlab.onos.event.AbstractListenerRegistry;
33import org.onlab.onos.event.EventDeliveryService;
34import org.onlab.onos.net.Device;
35import org.onlab.onos.net.DeviceId;
36import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070037import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070038import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070039import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070040import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070041import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070042import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070043import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070044import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070045import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070046import org.onlab.onos.net.flow.FlowRuleEvent;
47import org.onlab.onos.net.flow.FlowRuleListener;
48import org.onlab.onos.net.flow.FlowRuleProvider;
49import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
50import org.onlab.onos.net.flow.FlowRuleProviderService;
51import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070052import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070053import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070054import org.onlab.onos.net.provider.AbstractProviderRegistry;
55import org.onlab.onos.net.provider.AbstractProviderService;
56import org.slf4j.Logger;
57
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080058import java.util.HashSet;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080059import java.util.List;
60import java.util.Map;
61import java.util.Set;
62import java.util.concurrent.CancellationException;
63import java.util.concurrent.ExecutionException;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
66import java.util.concurrent.Future;
67import java.util.concurrent.TimeUnit;
68import java.util.concurrent.TimeoutException;
69import java.util.concurrent.atomic.AtomicReference;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080070import static com.google.common.base.Preconditions.checkNotNull;
71import static org.onlab.util.Tools.namedThreads;
72import static org.slf4j.LoggerFactory.getLogger;
alshabiba7f7ca82014-09-22 11:41:23 -070073
tome4729872014-09-23 00:37:37 -070074/**
75 * Provides implementation of the flow NB & SB APIs.
76 */
alshabib57044ba2014-09-16 15:58:01 -070077@Component(immediate = true)
78@Service
tom202175a2014-09-19 19:00:11 -070079public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070080 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
81 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070082
alshabib193525b2014-10-08 18:58:03 -070083 enum BatchState { STARTED, FINISHED, CANCELLED };
84
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070085 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070086 private final Logger log = getLogger(getClass());
87
88 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070089 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070090
alshabibbb42cad2014-09-25 11:43:05 -070091 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070092
Thomas Vachuska9b2da212014-11-10 19:30:25 -080093 private ExecutorService futureService;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094
tombe988312014-09-19 18:38:47 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070097
alshabib57044ba2014-09-16 15:58:01 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -070099 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -0700100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700102 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700103
104 @Activate
105 public void activate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800106 futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
tomc78acee2014-09-24 15:16:55 -0700107 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700108 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
109 log.info("Started");
110 }
111
112 @Deactivate
113 public void deactivate() {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800114 futureService.shutdownNow();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700115
tomc78acee2014-09-24 15:16:55 -0700116 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700117 eventDispatcher.removeSink(FlowRuleEvent.class);
118 log.info("Stopped");
119 }
120
121 @Override
tom9b4030d2014-10-06 10:39:03 -0700122 public int getFlowRuleCount() {
123 return store.getFlowRuleCount();
124 }
125
126 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700127 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700128 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700129 }
130
131 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700132 public void applyFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700133 Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700134 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700135 toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700136 }
Madan Jampani6a456162014-10-24 11:36:17 -0700137 applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
alshabib57044ba2014-09-16 15:58:01 -0700138 }
139
140 @Override
141 public void removeFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700142 Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700143 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700144 toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700145 }
Madan Jampani6a456162014-10-24 11:36:17 -0700146 applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
alshabiba68eb962014-09-24 20:34:13 -0700147 }
alshabib57044ba2014-09-16 15:58:01 -0700148
alshabiba68eb962014-09-24 20:34:13 -0700149 @Override
150 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700151 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700152 }
153
154 @Override
155 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700156 Set<FlowRule> flowEntries = Sets.newHashSet();
157 for (Device d : deviceService.getDevices()) {
158 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
159 if (flowEntry.appId() == id.id()) {
160 flowEntries.add(flowEntry);
161 }
162 }
163 }
164 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700165 }
166
167 @Override
alshabibaa7e7de2014-11-12 19:20:44 -0800168 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
169 Set<FlowRule> matches = Sets.newHashSet();
170 long toLookUp = ((long) appId.id() << 16) | groupId;
171 for (Device d : deviceService.getDevices()) {
172 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
173 if ((flowEntry.id().value() >>> 32) == toLookUp) {
174 matches.add(flowEntry);
175 }
176 }
177 }
178 return matches;
179 }
180
181 @Override
alshabib902d41b2014-10-07 16:52:05 -0700182 public Future<CompletedBatchOperation> applyBatch(
183 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700184 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700185 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700186 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700187 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
188 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700189 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700190 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700191
192 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700193 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700194 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
195 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700196 futures.add(future);
197 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700198 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700199 }
200
201 @Override
alshabib57044ba2014-09-16 15:58:01 -0700202 public void addListener(FlowRuleListener listener) {
203 listenerRegistry.addListener(listener);
204 }
205
206 @Override
207 public void removeListener(FlowRuleListener listener) {
208 listenerRegistry.removeListener(listener);
209 }
210
211 @Override
212 protected FlowRuleProviderService createProviderService(
213 FlowRuleProvider provider) {
214 return new InternalFlowRuleProviderService(provider);
215 }
216
217 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700218 extends AbstractProviderService<FlowRuleProvider>
219 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700220
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700221 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
222
alshabib57044ba2014-09-16 15:58:01 -0700223 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
224 super(provider);
225 }
226
227 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700228 public void flowRemoved(FlowEntry flowEntry) {
229 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700230 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700231 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700232 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700233 if (stored == null) {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800234 log.debug("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700235 return;
236 }
alshabib1c319ff2014-10-04 20:29:09 -0700237 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700238 FlowRuleProvider frp = getProvider(device.providerId());
239 FlowRuleEvent event = null;
240 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700241 case ADDED:
242 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700243 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700244 break;
245 case PENDING_REMOVE:
246 case REMOVED:
247 event = store.removeFlowRule(stored);
248 break;
249 default:
250 break;
alshabib57044ba2014-09-16 15:58:01 -0700251
alshabiba68eb962014-09-24 20:34:13 -0700252 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700253 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700254 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700255 post(event);
256 }
alshabib57044ba2014-09-16 15:58:01 -0700257 }
258
alshabibba5ac482014-10-02 17:15:20 -0700259
alshabib1c319ff2014-10-04 20:29:09 -0700260 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700261 checkNotNull(flowRule, FLOW_RULE_NULL);
262 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700263 Device device = deviceService.getDevice(flowRule.deviceId());
264 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700265 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700266 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700267 case PENDING_REMOVE:
268 case REMOVED:
269 event = store.removeFlowRule(flowRule);
270 frp.removeFlowRule(flowRule);
271 break;
272 case ADDED:
273 case PENDING_ADD:
274 frp.applyFlowRule(flowRule);
275 break;
276 default:
277 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700278 }
279
alshabibbb42cad2014-09-25 11:43:05 -0700280 if (event != null) {
281 log.debug("Flow {} removed", flowRule);
282 post(event);
283 }
alshabib57044ba2014-09-16 15:58:01 -0700284
285 }
286
alshabibba5ac482014-10-02 17:15:20 -0700287
288 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700289 checkNotNull(flowRule, FLOW_RULE_NULL);
290 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700291 FlowRuleProvider frp = getProvider(flowRule.deviceId());
292 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700293 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700294 }
295
alshabibba5ac482014-10-02 17:15:20 -0700296
alshabib1c319ff2014-10-04 20:29:09 -0700297 private void flowAdded(FlowEntry flowEntry) {
298 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700299 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700300
alshabib1c319ff2014-10-04 20:29:09 -0700301 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700302
alshabib1c319ff2014-10-04 20:29:09 -0700303 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700304 if (event == null) {
305 log.debug("No flow store event generated.");
306 } else {
Jonathan Hart58682dd2014-11-24 20:11:16 -0800307 log.trace("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700308 post(event);
309 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700310 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800311 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700312 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700313 }
alshabib219ebaa2014-09-22 15:41:24 -0700314
alshabib57044ba2014-09-16 15:58:01 -0700315 }
316
alshabib1c319ff2014-10-04 20:29:09 -0700317 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
318 if (storedRule == null) {
319 return false;
320 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700321 if (storedRule.isPermanent()) {
322 return true;
323 }
324
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700325 final long timeout = storedRule.timeout() * 1000;
326 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700327 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700328 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700329 return true;
330 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700331 if (!lastSeen.containsKey(storedRule)) {
332 // checking for the first time
333 lastSeen.put(storedRule, storedRule.lastSeen());
334 // Use following if lastSeen attr. was removed.
335 //lastSeen.put(storedRule, currentTime);
336 }
337 Long last = lastSeen.get(storedRule);
338 if (last == null) {
339 // concurrently removed? let the liveness check fail
340 return false;
341 }
alshabib85c41972014-10-03 13:48:39 -0700342
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700343 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700344 return true;
345 }
346 return false;
alshabibba5ac482014-10-02 17:15:20 -0700347 }
348
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700349 // Posts the specified event to the local event dispatcher.
350 private void post(FlowRuleEvent event) {
351 if (event != null) {
352 eventDispatcher.post(event);
353 }
354 }
alshabib5c370ff2014-09-18 10:12:14 -0700355
356 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700357 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
358 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700359
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700360 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700361 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700362 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700363 flowAdded(rule);
364 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700365 // the device has a rule the store does not have
366 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700367 }
368 }
alshabib1c319ff2014-10-04 20:29:09 -0700369 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700370 // there are rules in the store that aren't on the switch
371 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700372
alshabiba7f7ca82014-09-22 11:41:23 -0700373 }
alshabib5c370ff2014-09-18 10:12:14 -0700374 }
alshabib57044ba2014-09-16 15:58:01 -0700375 }
376
tomc78acee2014-09-24 15:16:55 -0700377 // Store delegate to re-post events emitted from the store.
378 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800379
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800380 // FIXME set appropriate default and make it configurable
381 private static final int TIMEOUT_PER_OP = 500; // ms
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800382
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 // TODO: Right now we only dispatch events at individual flowEntry level.
384 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700385 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700386 public void notify(FlowRuleBatchEvent event) {
387 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700388 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700389 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800390 // Request has been forwarded to MASTER Node, and was
391 for (FlowRule entry : request.toAdd()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700392 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
393 }
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800394 for (FlowRule entry : request.toRemove()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700395 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
396 }
397 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700398
Madan Jampani117aaae2014-10-23 10:04:05 -0700399 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700400
Madan Jampani117aaae2014-10-23 10:04:05 -0700401 FlowRuleProvider flowRuleProvider =
402 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800403 final Future<CompletedBatchOperation> result =
Madan Jampani117aaae2014-10-23 10:04:05 -0700404 flowRuleProvider.executeBatch(batchOperation);
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800405 futureService.submit(new Runnable() {
Madan Jampani117aaae2014-10-23 10:04:05 -0700406 @Override
407 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800408 CompletedBatchOperation res;
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800409 try {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800410 res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800411 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800412 } catch (TimeoutException | InterruptedException | ExecutionException e) {
413 log.warn("Something went wrong with the batch operation {}",
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800414 request.batchId(), e);
415
416 Set<FlowRule> failures = new HashSet<>(batchOperation.size());
417 for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
418 failures.add(op.getTarget());
419 }
420 res = new CompletedBatchOperation(false, failures);
421 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800422 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700423 }
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800424 });
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800425 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700426
Madan Jampani117aaae2014-10-23 10:04:05 -0700427 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800428 // MASTER Node has pushed the batch down to the Device
429
430 // Note: RULE_ADDED will be posted
431 // when Flow was actually confirmed by stats reply.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700432 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800433
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700434 default:
435 break;
436 }
tomc78acee2014-09-24 15:16:55 -0700437 }
438 }
alshabib902d41b2014-10-07 16:52:05 -0700439
Madan Jampani117aaae2014-10-23 10:04:05 -0700440 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700441
alshabib193525b2014-10-08 18:58:03 -0700442 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700443 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700444 private final AtomicReference<BatchState> state;
445 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700446
alshabib193525b2014-10-08 18:58:03 -0700447 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700448 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700449 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700450 this.batches = batches;
451 state = new AtomicReference<FlowRuleManager.BatchState>();
452 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700453 }
454
455 @Override
456 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700457 if (state.get() == BatchState.FINISHED) {
458 return false;
459 }
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800460 if (log.isDebugEnabled()) {
461 log.debug("Cancelling FlowRuleBatchFuture",
462 new RuntimeException("Just printing backtrace"));
463 }
alshabib193525b2014-10-08 18:58:03 -0700464 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
465 return false;
466 }
467 cleanUpBatch();
468 for (Future<CompletedBatchOperation> f : futures) {
469 f.cancel(mayInterruptIfRunning);
470 }
471 return true;
alshabib902d41b2014-10-07 16:52:05 -0700472 }
473
474 @Override
475 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700476 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700477 }
478
479 @Override
480 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700481 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700482 }
483
alshabib193525b2014-10-08 18:58:03 -0700484
alshabib902d41b2014-10-07 16:52:05 -0700485 @Override
486 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700487 ExecutionException {
488
489 if (isDone()) {
490 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700491 }
alshabib193525b2014-10-08 18:58:03 -0700492
493 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700494 Set<FlowRule> failed = Sets.newHashSet();
Brian O'Connor427a1762014-11-19 18:40:32 -0800495 Set<Long> failedIds = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700496 CompletedBatchOperation completed;
497 for (Future<CompletedBatchOperation> future : futures) {
498 completed = future.get();
Brian O'Connor427a1762014-11-19 18:40:32 -0800499 success = validateBatchOperation(failed, failedIds, completed);
alshabib193525b2014-10-08 18:58:03 -0700500 }
501
Brian O'Connor427a1762014-11-19 18:40:32 -0800502 return finalizeBatchOperation(success, failed, failedIds);
alshabib193525b2014-10-08 18:58:03 -0700503
alshabib902d41b2014-10-07 16:52:05 -0700504 }
505
506 @Override
507 public CompletedBatchOperation get(long timeout, TimeUnit unit)
508 throws InterruptedException, ExecutionException,
509 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700510
511 if (isDone()) {
512 return overall;
513 }
514 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700515 Set<FlowRule> failed = Sets.newHashSet();
Brian O'Connor427a1762014-11-19 18:40:32 -0800516 Set<Long> failedIds = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700517 CompletedBatchOperation completed;
alshabib193525b2014-10-08 18:58:03 -0700518 for (Future<CompletedBatchOperation> future : futures) {
Brian O'Connorfa81eae2014-10-30 13:20:05 -0700519 completed = future.get(timeout, unit);
Brian O'Connor427a1762014-11-19 18:40:32 -0800520 success = validateBatchOperation(failed, failedIds, completed);
alshabib902d41b2014-10-07 16:52:05 -0700521 }
Brian O'Connor427a1762014-11-19 18:40:32 -0800522 return finalizeBatchOperation(success, failed, failedIds);
alshabib902d41b2014-10-07 16:52:05 -0700523 }
524
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700525 private boolean validateBatchOperation(Set<FlowRule> failed,
Brian O'Connor427a1762014-11-19 18:40:32 -0800526 Set<Long> failedIds,
527 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700528
529 if (isCancelled()) {
530 throw new CancellationException();
531 }
532 if (!completed.isSuccess()) {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800533 log.warn("FlowRuleBatch failed: {}", completed);
alshabib193525b2014-10-08 18:58:03 -0700534 failed.addAll(completed.failedItems());
Brian O'Connor427a1762014-11-19 18:40:32 -0800535 failedIds.addAll(completed.failedIds());
alshabib193525b2014-10-08 18:58:03 -0700536 cleanUpBatch();
537 cancelAllSubBatches();
538 return false;
539 }
540 return true;
541 }
542
543 private void cancelAllSubBatches() {
544 for (Future<CompletedBatchOperation> f : futures) {
545 f.cancel(true);
546 }
547 }
548
549 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Brian O'Connor427a1762014-11-19 18:40:32 -0800550 Set<FlowRule> failed,
551 Set<Long> failedIds) {
alshabib26834582014-10-08 20:15:46 -0700552 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700553 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
554 if (state.get() == BatchState.FINISHED) {
555 return overall;
556 }
557 throw new CancellationException();
558 }
Brian O'Connor427a1762014-11-19 18:40:32 -0800559 overall = new CompletedBatchOperation(success, failed, failedIds);
alshabib193525b2014-10-08 18:58:03 -0700560 return overall;
561 }
562 }
563
564 private void cleanUpBatch() {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800565 log.debug("cleaning up batch");
566 // TODO convert these into a batch?
alshabib193525b2014-10-08 18:58:03 -0700567 for (FlowRuleBatchEntry fbe : batches.values()) {
568 if (fbe.getOperator() == FlowRuleOperation.ADD ||
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800569 fbe.getOperator() == FlowRuleOperation.MODIFY) {
alshabib193525b2014-10-08 18:58:03 -0700570 store.deleteFlowRule(fbe.getTarget());
571 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700572 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700573 store.storeFlowRule(fbe.getTarget());
574 }
575 }
alshabib193525b2014-10-08 18:58:03 -0700576 }
alshabib902d41b2014-10-07 16:52:05 -0700577 }
alshabib57044ba2014-09-16 15:58:01 -0700578}