blob: e996dfc517777edc3ca3c651050a2d932bc1fd76 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
tombe988312014-09-19 18:38:47 -070016package org.onlab.onos.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
alshabibbb42cad2014-09-25 11:43:05 -070018import static com.google.common.base.Preconditions.checkNotNull;
19import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070020import static org.onlab.util.Tools.namedThreads;
alshabibbb42cad2014-09-25 11:43:05 -070021
alshabibbb42cad2014-09-25 11:43:05 -070022import java.util.List;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070023import java.util.Map;
Madan Jampani117aaae2014-10-23 10:04:05 -070024import java.util.Set;
alshabib193525b2014-10-08 18:58:03 -070025import java.util.concurrent.CancellationException;
alshabib902d41b2014-10-07 16:52:05 -070026import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070027import java.util.concurrent.ExecutorService;
Madan Jampani117aaae2014-10-23 10:04:05 -070028import java.util.concurrent.Executors;
alshabib902d41b2014-10-07 16:52:05 -070029import java.util.concurrent.Future;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.TimeoutException;
alshabib193525b2014-10-08 18:58:03 -070032import java.util.concurrent.atomic.AtomicReference;
alshabibbb42cad2014-09-25 11:43:05 -070033
alshabib57044ba2014-09-16 15:58:01 -070034import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
Thomas Vachuskae0f804a2014-10-27 23:40:48 -070040import org.onlab.onos.core.ApplicationId;
alshabib57044ba2014-09-16 15:58:01 -070041import org.onlab.onos.event.AbstractListenerRegistry;
42import org.onlab.onos.event.EventDeliveryService;
43import org.onlab.onos.net.Device;
44import org.onlab.onos.net.DeviceId;
45import org.onlab.onos.net.device.DeviceService;
alshabib902d41b2014-10-07 16:52:05 -070046import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabibcf369912014-10-13 14:16:42 -070047import org.onlab.onos.net.flow.DefaultFlowEntry;
alshabib1c319ff2014-10-04 20:29:09 -070048import org.onlab.onos.net.flow.FlowEntry;
alshabib57044ba2014-09-16 15:58:01 -070049import org.onlab.onos.net.flow.FlowRule;
alshabib902d41b2014-10-07 16:52:05 -070050import org.onlab.onos.net.flow.FlowRuleBatchEntry;
alshabib193525b2014-10-08 18:58:03 -070051import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070052import org.onlab.onos.net.flow.FlowRuleBatchEvent;
alshabib902d41b2014-10-07 16:52:05 -070053import org.onlab.onos.net.flow.FlowRuleBatchOperation;
Madan Jampani117aaae2014-10-23 10:04:05 -070054import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib57044ba2014-09-16 15:58:01 -070055import org.onlab.onos.net.flow.FlowRuleEvent;
56import org.onlab.onos.net.flow.FlowRuleListener;
57import org.onlab.onos.net.flow.FlowRuleProvider;
58import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
59import org.onlab.onos.net.flow.FlowRuleProviderService;
60import org.onlab.onos.net.flow.FlowRuleService;
tombe988312014-09-19 18:38:47 -070061import org.onlab.onos.net.flow.FlowRuleStore;
tomc78acee2014-09-24 15:16:55 -070062import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
alshabib57044ba2014-09-16 15:58:01 -070063import org.onlab.onos.net.provider.AbstractProviderRegistry;
64import org.onlab.onos.net.provider.AbstractProviderService;
65import org.slf4j.Logger;
66
alshabib902d41b2014-10-07 16:52:05 -070067import com.google.common.collect.ArrayListMultimap;
Madan Jampani6a456162014-10-24 11:36:17 -070068import com.google.common.collect.Iterables;
alshabibbb42cad2014-09-25 11:43:05 -070069import com.google.common.collect.Lists;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070070import com.google.common.collect.Maps;
alshabib902d41b2014-10-07 16:52:05 -070071import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070072import com.google.common.collect.Sets;
73import com.google.common.util.concurrent.Futures;
74import com.google.common.util.concurrent.ListenableFuture;
alshabiba7f7ca82014-09-22 11:41:23 -070075
tome4729872014-09-23 00:37:37 -070076/**
77 * Provides implementation of the flow NB & SB APIs.
78 */
alshabib57044ba2014-09-16 15:58:01 -070079@Component(immediate = true)
80@Service
tom202175a2014-09-19 19:00:11 -070081public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070082 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
83 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070084
alshabib193525b2014-10-08 18:58:03 -070085 enum BatchState { STARTED, FINISHED, CANCELLED };
86
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070087 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070088 private final Logger log = getLogger(getClass());
89
90 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070091 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070092
alshabibbb42cad2014-09-25 11:43:05 -070093 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070094
Thomas Vachuska8ac922d2014-10-23 16:17:03 -070095 private final ExecutorService futureListeners =
96 Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070097
tombe988312014-09-19 18:38:47 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700100
alshabib57044ba2014-09-16 15:58:01 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700102 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -0700103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700105 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700106
107 @Activate
108 public void activate() {
tomc78acee2014-09-24 15:16:55 -0700109 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700110 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
111 log.info("Started");
112 }
113
114 @Deactivate
115 public void deactivate() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700116 futureListeners.shutdownNow();
117
tomc78acee2014-09-24 15:16:55 -0700118 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700119 eventDispatcher.removeSink(FlowRuleEvent.class);
120 log.info("Stopped");
121 }
122
123 @Override
tom9b4030d2014-10-06 10:39:03 -0700124 public int getFlowRuleCount() {
125 return store.getFlowRuleCount();
126 }
127
128 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700129 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700130 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700131 }
132
133 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700134 public void applyFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700135 Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700136 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700137 toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700138 }
Madan Jampani6a456162014-10-24 11:36:17 -0700139 applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
alshabib57044ba2014-09-16 15:58:01 -0700140 }
141
142 @Override
143 public void removeFlowRules(FlowRule... flowRules) {
Madan Jampani6a456162014-10-24 11:36:17 -0700144 Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
alshabib57044ba2014-09-16 15:58:01 -0700145 for (int i = 0; i < flowRules.length; i++) {
Madan Jampani6a456162014-10-24 11:36:17 -0700146 toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700147 }
Madan Jampani6a456162014-10-24 11:36:17 -0700148 applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
alshabiba68eb962014-09-24 20:34:13 -0700149 }
alshabib57044ba2014-09-16 15:58:01 -0700150
alshabiba68eb962014-09-24 20:34:13 -0700151 @Override
152 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700153 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700154 }
155
156 @Override
157 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700158 Set<FlowRule> flowEntries = Sets.newHashSet();
159 for (Device d : deviceService.getDevices()) {
160 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
161 if (flowEntry.appId() == id.id()) {
162 flowEntries.add(flowEntry);
163 }
164 }
165 }
166 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700167 }
168
169 @Override
alshabib902d41b2014-10-07 16:52:05 -0700170 public Future<CompletedBatchOperation> applyBatch(
171 FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700172 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
alshabib902d41b2014-10-07 16:52:05 -0700173 ArrayListMultimap.create();
alshabib193525b2014-10-08 18:58:03 -0700174 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
alshabib902d41b2014-10-07 16:52:05 -0700175 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
176 final FlowRule f = fbe.getTarget();
Madan Jampani117aaae2014-10-23 10:04:05 -0700177 perDeviceBatches.put(f.deviceId(), fbe);
alshabib902d41b2014-10-07 16:52:05 -0700178 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700179
180 for (DeviceId deviceId : perDeviceBatches.keySet()) {
alshabib902d41b2014-10-07 16:52:05 -0700181 FlowRuleBatchOperation b =
Madan Jampani117aaae2014-10-23 10:04:05 -0700182 new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
183 Future<CompletedBatchOperation> future = store.storeBatch(b);
alshabib902d41b2014-10-07 16:52:05 -0700184 futures.add(future);
185 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700186 return new FlowRuleBatchFuture(futures, perDeviceBatches);
alshabib902d41b2014-10-07 16:52:05 -0700187 }
188
189 @Override
alshabib57044ba2014-09-16 15:58:01 -0700190 public void addListener(FlowRuleListener listener) {
191 listenerRegistry.addListener(listener);
192 }
193
194 @Override
195 public void removeListener(FlowRuleListener listener) {
196 listenerRegistry.removeListener(listener);
197 }
198
199 @Override
200 protected FlowRuleProviderService createProviderService(
201 FlowRuleProvider provider) {
202 return new InternalFlowRuleProviderService(provider);
203 }
204
205 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700206 extends AbstractProviderService<FlowRuleProvider>
207 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700208
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700209 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
210
alshabib57044ba2014-09-16 15:58:01 -0700211 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
212 super(provider);
213 }
214
215 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700216 public void flowRemoved(FlowEntry flowEntry) {
217 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700218 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700219 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700220 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700221 if (stored == null) {
alshabib1c319ff2014-10-04 20:29:09 -0700222 log.info("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700223 return;
224 }
alshabib1c319ff2014-10-04 20:29:09 -0700225 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700226 FlowRuleProvider frp = getProvider(device.providerId());
227 FlowRuleEvent event = null;
228 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700229 case ADDED:
230 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700231 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700232 break;
233 case PENDING_REMOVE:
234 case REMOVED:
235 event = store.removeFlowRule(stored);
236 break;
237 default:
238 break;
alshabib57044ba2014-09-16 15:58:01 -0700239
alshabiba68eb962014-09-24 20:34:13 -0700240 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700241 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700242 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700243 post(event);
244 }
alshabib57044ba2014-09-16 15:58:01 -0700245 }
246
alshabibba5ac482014-10-02 17:15:20 -0700247
alshabib1c319ff2014-10-04 20:29:09 -0700248 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700249 checkNotNull(flowRule, FLOW_RULE_NULL);
250 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700251 Device device = deviceService.getDevice(flowRule.deviceId());
252 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700253 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700254 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700255 case PENDING_REMOVE:
256 case REMOVED:
257 event = store.removeFlowRule(flowRule);
258 frp.removeFlowRule(flowRule);
259 break;
260 case ADDED:
261 case PENDING_ADD:
262 frp.applyFlowRule(flowRule);
263 break;
264 default:
265 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700266 }
267
alshabibbb42cad2014-09-25 11:43:05 -0700268 if (event != null) {
269 log.debug("Flow {} removed", flowRule);
270 post(event);
271 }
alshabib57044ba2014-09-16 15:58:01 -0700272
273 }
274
alshabibba5ac482014-10-02 17:15:20 -0700275
276 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700277 checkNotNull(flowRule, FLOW_RULE_NULL);
278 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700279 FlowRuleProvider frp = getProvider(flowRule.deviceId());
280 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700281 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700282 }
283
alshabibba5ac482014-10-02 17:15:20 -0700284
alshabib1c319ff2014-10-04 20:29:09 -0700285 private void flowAdded(FlowEntry flowEntry) {
286 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700287 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700288
alshabib1c319ff2014-10-04 20:29:09 -0700289 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700290
alshabib1c319ff2014-10-04 20:29:09 -0700291 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700292 if (event == null) {
293 log.debug("No flow store event generated.");
294 } else {
alshabib1c319ff2014-10-04 20:29:09 -0700295 log.debug("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700296 post(event);
297 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700298 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800299 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700300 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700301 }
alshabib219ebaa2014-09-22 15:41:24 -0700302
alshabib57044ba2014-09-16 15:58:01 -0700303 }
304
alshabib1c319ff2014-10-04 20:29:09 -0700305 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
306 if (storedRule == null) {
307 return false;
308 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700309 if (storedRule.isPermanent()) {
310 return true;
311 }
312
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700313 final long timeout = storedRule.timeout() * 1000;
314 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700315 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700316 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700317 return true;
318 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700319 if (!lastSeen.containsKey(storedRule)) {
320 // checking for the first time
321 lastSeen.put(storedRule, storedRule.lastSeen());
322 // Use following if lastSeen attr. was removed.
323 //lastSeen.put(storedRule, currentTime);
324 }
325 Long last = lastSeen.get(storedRule);
326 if (last == null) {
327 // concurrently removed? let the liveness check fail
328 return false;
329 }
alshabib85c41972014-10-03 13:48:39 -0700330
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700331 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700332 return true;
333 }
334 return false;
alshabibba5ac482014-10-02 17:15:20 -0700335 }
336
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700337 // Posts the specified event to the local event dispatcher.
338 private void post(FlowRuleEvent event) {
339 if (event != null) {
340 eventDispatcher.post(event);
341 }
342 }
alshabib5c370ff2014-09-18 10:12:14 -0700343
344 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700345 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
346 List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700347
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700348 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700349 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700350 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700351 flowAdded(rule);
352 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700353 // the device has a rule the store does not have
354 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700355 }
356 }
alshabib1c319ff2014-10-04 20:29:09 -0700357 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700358 // there are rules in the store that aren't on the switch
359 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700360
alshabiba7f7ca82014-09-22 11:41:23 -0700361 }
alshabib5c370ff2014-09-18 10:12:14 -0700362 }
alshabib57044ba2014-09-16 15:58:01 -0700363 }
364
tomc78acee2014-09-24 15:16:55 -0700365 // Store delegate to re-post events emitted from the store.
366 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Madan Jampani117aaae2014-10-23 10:04:05 -0700367 // TODO: Right now we only dispatch events at individual flowEntry level.
368 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700369 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700370 public void notify(FlowRuleBatchEvent event) {
371 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700372 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700373 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800374 // Request has been forwarded to MASTER Node, and was
375 for (FlowRule entry : request.toAdd()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700376 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
377 }
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800378 for (FlowRule entry : request.toRemove()) {
Madan Jampani31961c12014-10-23 12:06:58 -0700379 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
380 }
381 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700382
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 FlowRuleBatchOperation batchOperation = request.asBatchOperation();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700384
Madan Jampani117aaae2014-10-23 10:04:05 -0700385 FlowRuleProvider flowRuleProvider =
386 getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
387 final ListenableFuture<CompletedBatchOperation> result =
388 flowRuleProvider.executeBatch(batchOperation);
389 result.addListener(new Runnable() {
390 @Override
391 public void run() {
Thomas Vachuska8ac922d2014-10-23 16:17:03 -0700392 store.batchOperationComplete(FlowRuleBatchEvent.completed(request,
393 Futures.getUnchecked(result)));
Madan Jampani117aaae2014-10-23 10:04:05 -0700394 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700395 }, futureListeners);
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800396 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700397
Madan Jampani117aaae2014-10-23 10:04:05 -0700398 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800399 // MASTER Node has pushed the batch down to the Device
400
401 // Note: RULE_ADDED will be posted
402 // when Flow was actually confirmed by stats reply.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700403 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800404
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700405 default:
406 break;
407 }
tomc78acee2014-09-24 15:16:55 -0700408 }
409 }
alshabib902d41b2014-10-07 16:52:05 -0700410
Madan Jampani117aaae2014-10-23 10:04:05 -0700411 private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
alshabib902d41b2014-10-07 16:52:05 -0700412
alshabib193525b2014-10-08 18:58:03 -0700413 private final List<Future<CompletedBatchOperation>> futures;
Madan Jampani117aaae2014-10-23 10:04:05 -0700414 private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
alshabib193525b2014-10-08 18:58:03 -0700415 private final AtomicReference<BatchState> state;
416 private CompletedBatchOperation overall;
alshabib902d41b2014-10-07 16:52:05 -0700417
alshabib193525b2014-10-08 18:58:03 -0700418 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Madan Jampani117aaae2014-10-23 10:04:05 -0700419 Multimap<DeviceId, FlowRuleBatchEntry> batches) {
alshabib902d41b2014-10-07 16:52:05 -0700420 this.futures = futures;
alshabib193525b2014-10-08 18:58:03 -0700421 this.batches = batches;
422 state = new AtomicReference<FlowRuleManager.BatchState>();
423 state.set(BatchState.STARTED);
alshabib902d41b2014-10-07 16:52:05 -0700424 }
425
426 @Override
427 public boolean cancel(boolean mayInterruptIfRunning) {
alshabib193525b2014-10-08 18:58:03 -0700428 if (state.get() == BatchState.FINISHED) {
429 return false;
430 }
431 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
432 return false;
433 }
434 cleanUpBatch();
435 for (Future<CompletedBatchOperation> f : futures) {
436 f.cancel(mayInterruptIfRunning);
437 }
438 return true;
alshabib902d41b2014-10-07 16:52:05 -0700439 }
440
441 @Override
442 public boolean isCancelled() {
alshabib193525b2014-10-08 18:58:03 -0700443 return state.get() == BatchState.CANCELLED;
alshabib902d41b2014-10-07 16:52:05 -0700444 }
445
446 @Override
447 public boolean isDone() {
alshabib193525b2014-10-08 18:58:03 -0700448 return state.get() == BatchState.FINISHED;
alshabib902d41b2014-10-07 16:52:05 -0700449 }
450
alshabib193525b2014-10-08 18:58:03 -0700451
alshabib902d41b2014-10-07 16:52:05 -0700452 @Override
453 public CompletedBatchOperation get() throws InterruptedException,
alshabib193525b2014-10-08 18:58:03 -0700454 ExecutionException {
455
456 if (isDone()) {
457 return overall;
alshabib902d41b2014-10-07 16:52:05 -0700458 }
alshabib193525b2014-10-08 18:58:03 -0700459
460 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700461 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700462 CompletedBatchOperation completed;
463 for (Future<CompletedBatchOperation> future : futures) {
464 completed = future.get();
alshabib3effd042014-10-17 12:00:31 -0700465 success = validateBatchOperation(failed, completed);
alshabib193525b2014-10-08 18:58:03 -0700466 }
467
468 return finalizeBatchOperation(success, failed);
469
alshabib902d41b2014-10-07 16:52:05 -0700470 }
471
472 @Override
473 public CompletedBatchOperation get(long timeout, TimeUnit unit)
474 throws InterruptedException, ExecutionException,
475 TimeoutException {
alshabib193525b2014-10-08 18:58:03 -0700476
477 if (isDone()) {
478 return overall;
479 }
480 boolean success = true;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700481 Set<FlowRule> failed = Sets.newHashSet();
alshabib193525b2014-10-08 18:58:03 -0700482 CompletedBatchOperation completed;
alshabib902d41b2014-10-07 16:52:05 -0700483 long start = System.nanoTime();
484 long end = start + unit.toNanos(timeout);
alshabib193525b2014-10-08 18:58:03 -0700485
486 for (Future<CompletedBatchOperation> future : futures) {
alshabib902d41b2014-10-07 16:52:05 -0700487 long now = System.nanoTime();
488 long thisTimeout = end - now;
alshabib193525b2014-10-08 18:58:03 -0700489 completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
alshabib3effd042014-10-17 12:00:31 -0700490 success = validateBatchOperation(failed, completed);
alshabib902d41b2014-10-07 16:52:05 -0700491 }
alshabib193525b2014-10-08 18:58:03 -0700492 return finalizeBatchOperation(success, failed);
alshabib902d41b2014-10-07 16:52:05 -0700493 }
494
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700495 private boolean validateBatchOperation(Set<FlowRule> failed,
alshabib3effd042014-10-17 12:00:31 -0700496 CompletedBatchOperation completed) {
alshabib193525b2014-10-08 18:58:03 -0700497
498 if (isCancelled()) {
499 throw new CancellationException();
500 }
501 if (!completed.isSuccess()) {
502 failed.addAll(completed.failedItems());
503 cleanUpBatch();
504 cancelAllSubBatches();
505 return false;
506 }
507 return true;
508 }
509
510 private void cancelAllSubBatches() {
511 for (Future<CompletedBatchOperation> f : futures) {
512 f.cancel(true);
513 }
514 }
515
516 private CompletedBatchOperation finalizeBatchOperation(boolean success,
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700517 Set<FlowRule> failed) {
alshabib26834582014-10-08 20:15:46 -0700518 synchronized (this) {
alshabib193525b2014-10-08 18:58:03 -0700519 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
520 if (state.get() == BatchState.FINISHED) {
521 return overall;
522 }
523 throw new CancellationException();
524 }
525 overall = new CompletedBatchOperation(success, failed);
526 return overall;
527 }
528 }
529
530 private void cleanUpBatch() {
531 for (FlowRuleBatchEntry fbe : batches.values()) {
532 if (fbe.getOperator() == FlowRuleOperation.ADD ||
533 fbe.getOperator() == FlowRuleOperation.MODIFY) {
534 store.deleteFlowRule(fbe.getTarget());
535 } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
alshabibcf369912014-10-13 14:16:42 -0700536 store.removeFlowRule(new DefaultFlowEntry(fbe.getTarget()));
alshabib193525b2014-10-08 18:58:03 -0700537 store.storeFlowRule(fbe.getTarget());
538 }
539 }
alshabib193525b2014-10-08 18:58:03 -0700540 }
alshabib902d41b2014-10-07 16:52:05 -0700541 }
alshabib57044ba2014-09-16 15:58:01 -0700542}