blob: 158764f061f30f40fcb23207e56932a8dc511204 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
Thomas Vachuska9b2da212014-11-10 19:30:25 -080018import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080024import com.google.common.util.concurrent.Futures;
alshabib57044ba2014-09-16 15:58:01 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.core.ApplicationId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080032import org.onosproject.core.CoreService;
33import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.event.AbstractListenerRegistry;
35import org.onosproject.event.EventDeliveryService;
36import org.onosproject.net.Device;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.flow.CompletedBatchOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.net.flow.FlowEntry;
41import org.onosproject.net.flow.FlowRule;
42import org.onosproject.net.flow.FlowRuleBatchEntry;
Brian O'Connorabafb502014-12-02 22:26:20 -080043import org.onosproject.net.flow.FlowRuleBatchEvent;
44import org.onosproject.net.flow.FlowRuleBatchOperation;
45import org.onosproject.net.flow.FlowRuleBatchRequest;
46import org.onosproject.net.flow.FlowRuleEvent;
47import org.onosproject.net.flow.FlowRuleListener;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080048import org.onosproject.net.flow.FlowRuleOperation;
49import org.onosproject.net.flow.FlowRuleOperations;
50import org.onosproject.net.flow.FlowRuleOperationsContext;
Brian O'Connorabafb502014-12-02 22:26:20 -080051import org.onosproject.net.flow.FlowRuleProvider;
52import org.onosproject.net.flow.FlowRuleProviderRegistry;
53import org.onosproject.net.flow.FlowRuleProviderService;
54import org.onosproject.net.flow.FlowRuleService;
55import org.onosproject.net.flow.FlowRuleStore;
56import org.onosproject.net.flow.FlowRuleStoreDelegate;
57import org.onosproject.net.provider.AbstractProviderRegistry;
58import org.onosproject.net.provider.AbstractProviderService;
alshabib57044ba2014-09-16 15:58:01 -070059import org.slf4j.Logger;
60
Brian O'Connor72cb19a2015-01-16 16:14:41 -080061import java.util.Collections;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080062import java.util.List;
63import java.util.Map;
64import java.util.Set;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080065import java.util.concurrent.ConcurrentHashMap;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080066import java.util.concurrent.ExecutorService;
67import java.util.concurrent.Executors;
68import java.util.concurrent.Future;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080069import java.util.concurrent.atomic.AtomicBoolean;
70
Thomas Vachuska9b2da212014-11-10 19:30:25 -080071import static com.google.common.base.Preconditions.checkNotNull;
72import static org.onlab.util.Tools.namedThreads;
73import static org.slf4j.LoggerFactory.getLogger;
alshabiba7f7ca82014-09-22 11:41:23 -070074
tome4729872014-09-23 00:37:37 -070075/**
76 * Provides implementation of the flow NB & SB APIs.
77 */
alshabib57044ba2014-09-16 15:58:01 -070078@Component(immediate = true)
79@Service
tom202175a2014-09-19 19:00:11 -070080public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070081 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
82 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070083
Sho SHIMIZU183b12fd2015-01-20 14:56:39 -080084 enum BatchState { STARTED, FINISHED, CANCELLED }
alshabib193525b2014-10-08 18:58:03 -070085
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070086 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070087 private final Logger log = getLogger(getClass());
88
89 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070090 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070091
alshabibbb42cad2014-09-25 11:43:05 -070092 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070093
Brian O'Connor72cb19a2015-01-16 16:14:41 -080094 protected ExecutorService deviceInstallers =
95 Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
96
97 protected ExecutorService operationsService =
98 Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
99
100 private IdGenerator idGenerator;
101
102 private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
103 ConcurrentHashMap<>();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700104
tombe988312014-09-19 18:38:47 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700107
alshabib57044ba2014-09-16 15:58:01 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700109 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -0700110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700112 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700113
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected CoreService coreService;
116
alshabib57044ba2014-09-16 15:58:01 -0700117 @Activate
118 public void activate() {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800119
120 idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
121
122
tomc78acee2014-09-24 15:16:55 -0700123 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700124 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
125 log.info("Started");
126 }
127
128 @Deactivate
129 public void deactivate() {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800130 deviceInstallers.shutdownNow();
131 operationsService.shutdownNow();
tomc78acee2014-09-24 15:16:55 -0700132 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700133 eventDispatcher.removeSink(FlowRuleEvent.class);
134 log.info("Stopped");
135 }
136
137 @Override
tom9b4030d2014-10-06 10:39:03 -0700138 public int getFlowRuleCount() {
139 return store.getFlowRuleCount();
140 }
141
142 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700143 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700144 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700145 }
146
147 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700148 public void applyFlowRules(FlowRule... flowRules) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800149 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
alshabib57044ba2014-09-16 15:58:01 -0700150 for (int i = 0; i < flowRules.length; i++) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800151 builder.add(flowRules[i]);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700152 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800153 apply(builder.build());
alshabib57044ba2014-09-16 15:58:01 -0700154 }
155
156 @Override
157 public void removeFlowRules(FlowRule... flowRules) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800158 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
alshabib57044ba2014-09-16 15:58:01 -0700159 for (int i = 0; i < flowRules.length; i++) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800160 builder.remove(flowRules[i]);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700161 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800162 apply(builder.build());
alshabiba68eb962014-09-24 20:34:13 -0700163 }
alshabib57044ba2014-09-16 15:58:01 -0700164
alshabiba68eb962014-09-24 20:34:13 -0700165 @Override
166 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700167 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700168 }
169
170 @Override
171 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700172 Set<FlowRule> flowEntries = Sets.newHashSet();
173 for (Device d : deviceService.getDevices()) {
174 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
175 if (flowEntry.appId() == id.id()) {
176 flowEntries.add(flowEntry);
177 }
178 }
179 }
180 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700181 }
182
183 @Override
alshabibaa7e7de2014-11-12 19:20:44 -0800184 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
185 Set<FlowRule> matches = Sets.newHashSet();
186 long toLookUp = ((long) appId.id() << 16) | groupId;
187 for (Device d : deviceService.getDevices()) {
188 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
189 if ((flowEntry.id().value() >>> 32) == toLookUp) {
190 matches.add(flowEntry);
191 }
192 }
193 }
194 return matches;
195 }
196
197 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800198 public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700199
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800200
201 FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
202 batch.getOperations().stream().forEach(op -> {
203 switch (op.getOperator()) {
204 case ADD:
205 fopsBuilder.add(op.getTarget());
206 break;
207 case REMOVE:
208 fopsBuilder.remove(op.getTarget());
209 break;
210 case MODIFY:
211 fopsBuilder.modify(op.getTarget());
212 break;
213 default:
214 log.warn("Unknown flow operation operator: {}", op.getOperator());
215
216 }
217 }
218 );
219
220 apply(fopsBuilder.build());
221 return Futures.immediateFuture(
222 new CompletedBatchOperation(true,
223 Collections.emptySet(), null));
224
225 }
226
227 @Override
228 public void apply(FlowRuleOperations ops) {
229 operationsService.submit(new FlowOperationsProcessor(ops));
alshabib902d41b2014-10-07 16:52:05 -0700230 }
231
232 @Override
alshabib57044ba2014-09-16 15:58:01 -0700233 public void addListener(FlowRuleListener listener) {
234 listenerRegistry.addListener(listener);
235 }
236
237 @Override
238 public void removeListener(FlowRuleListener listener) {
239 listenerRegistry.removeListener(listener);
240 }
241
242 @Override
243 protected FlowRuleProviderService createProviderService(
244 FlowRuleProvider provider) {
245 return new InternalFlowRuleProviderService(provider);
246 }
247
248 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700249 extends AbstractProviderService<FlowRuleProvider>
250 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700251
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700252 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
253
alshabib57044ba2014-09-16 15:58:01 -0700254 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
255 super(provider);
256 }
257
258 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700259 public void flowRemoved(FlowEntry flowEntry) {
260 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700261 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700262 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700263 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700264 if (stored == null) {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800265 log.debug("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700266 return;
267 }
alshabib1c319ff2014-10-04 20:29:09 -0700268 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700269 FlowRuleProvider frp = getProvider(device.providerId());
270 FlowRuleEvent event = null;
271 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700272 case ADDED:
273 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700274 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700275 break;
276 case PENDING_REMOVE:
277 case REMOVED:
278 event = store.removeFlowRule(stored);
279 break;
280 default:
281 break;
alshabib57044ba2014-09-16 15:58:01 -0700282
alshabiba68eb962014-09-24 20:34:13 -0700283 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700284 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700285 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700286 post(event);
287 }
alshabib57044ba2014-09-16 15:58:01 -0700288 }
289
alshabibba5ac482014-10-02 17:15:20 -0700290
alshabib1c319ff2014-10-04 20:29:09 -0700291 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700292 checkNotNull(flowRule, FLOW_RULE_NULL);
293 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700294 Device device = deviceService.getDevice(flowRule.deviceId());
295 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700296 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700297 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700298 case PENDING_REMOVE:
299 case REMOVED:
300 event = store.removeFlowRule(flowRule);
301 frp.removeFlowRule(flowRule);
302 break;
303 case ADDED:
304 case PENDING_ADD:
305 frp.applyFlowRule(flowRule);
306 break;
307 default:
308 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700309 }
310
alshabibbb42cad2014-09-25 11:43:05 -0700311 if (event != null) {
312 log.debug("Flow {} removed", flowRule);
313 post(event);
314 }
alshabib57044ba2014-09-16 15:58:01 -0700315
316 }
317
alshabibba5ac482014-10-02 17:15:20 -0700318
319 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700320 checkNotNull(flowRule, FLOW_RULE_NULL);
321 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700322 FlowRuleProvider frp = getProvider(flowRule.deviceId());
323 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700324 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700325 }
326
alshabibba5ac482014-10-02 17:15:20 -0700327
alshabib1c319ff2014-10-04 20:29:09 -0700328 private void flowAdded(FlowEntry flowEntry) {
329 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700330 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700331
alshabib1c319ff2014-10-04 20:29:09 -0700332 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700333
alshabib1c319ff2014-10-04 20:29:09 -0700334 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700335 if (event == null) {
336 log.debug("No flow store event generated.");
337 } else {
Jonathan Hart58682dd2014-11-24 20:11:16 -0800338 log.trace("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700339 post(event);
340 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700341 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800342 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700343 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700344 }
alshabib219ebaa2014-09-22 15:41:24 -0700345
alshabib57044ba2014-09-16 15:58:01 -0700346 }
347
alshabib1c319ff2014-10-04 20:29:09 -0700348 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
349 if (storedRule == null) {
350 return false;
351 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700352 if (storedRule.isPermanent()) {
353 return true;
354 }
355
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700356 final long timeout = storedRule.timeout() * 1000;
357 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700358 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700359 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700360 return true;
361 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700362 if (!lastSeen.containsKey(storedRule)) {
363 // checking for the first time
364 lastSeen.put(storedRule, storedRule.lastSeen());
365 // Use following if lastSeen attr. was removed.
366 //lastSeen.put(storedRule, currentTime);
367 }
368 Long last = lastSeen.get(storedRule);
369 if (last == null) {
370 // concurrently removed? let the liveness check fail
371 return false;
372 }
alshabib85c41972014-10-03 13:48:39 -0700373
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700374 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700375 return true;
376 }
377 return false;
alshabibba5ac482014-10-02 17:15:20 -0700378 }
379
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700380 // Posts the specified event to the local event dispatcher.
381 private void post(FlowRuleEvent event) {
382 if (event != null) {
383 eventDispatcher.post(event);
384 }
385 }
alshabib5c370ff2014-09-18 10:12:14 -0700386
387 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700388 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
alshabib64def642014-12-02 23:27:37 -0800389 Set<FlowEntry> storedRules = Sets.newHashSet(store.getFlowEntries(deviceId));
alshabibbb8b1282014-09-22 17:00:18 -0700390
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700391 for (FlowEntry rule : flowEntries) {
alshabiba7f7ca82014-09-22 11:41:23 -0700392 if (storedRules.remove(rule)) {
alshabib219ebaa2014-09-22 15:41:24 -0700393 // we both have the rule, let's update some info then.
alshabiba7f7ca82014-09-22 11:41:23 -0700394 flowAdded(rule);
395 } else {
alshabib219ebaa2014-09-22 15:41:24 -0700396 // the device has a rule the store does not have
397 extraneousFlow(rule);
alshabiba7f7ca82014-09-22 11:41:23 -0700398 }
399 }
alshabib1c319ff2014-10-04 20:29:09 -0700400 for (FlowEntry rule : storedRules) {
alshabiba7f7ca82014-09-22 11:41:23 -0700401 // there are rules in the store that aren't on the switch
402 flowMissing(rule);
alshabib54ce5892014-09-23 17:50:51 -0700403
alshabiba7f7ca82014-09-22 11:41:23 -0700404 }
alshabib5c370ff2014-09-18 10:12:14 -0700405 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800406
407 @Override
408 public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
409 store.batchOperationComplete(FlowRuleBatchEvent.completed(
410 new FlowRuleBatchRequest(batchId, Collections.emptySet()),
411 operation
412 ));
413 }
alshabib57044ba2014-09-16 15:58:01 -0700414 }
415
tomc78acee2014-09-24 15:16:55 -0700416 // Store delegate to re-post events emitted from the store.
417 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800418
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800419
Madan Jampani117aaae2014-10-23 10:04:05 -0700420 // TODO: Right now we only dispatch events at individual flowEntry level.
421 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700422 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700423 public void notify(FlowRuleBatchEvent event) {
424 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700425 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700426 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800427 // Request has been forwarded to MASTER Node, and was
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800428 request.ops().stream().forEach(
429 op -> {
430 switch (op.getOperator()) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700431
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800432 case ADD:
433 eventDispatcher.post(
434 new FlowRuleEvent(
435 FlowRuleEvent.Type.RULE_ADD_REQUESTED,
436 op.getTarget()));
437 break;
438 case REMOVE:
439 eventDispatcher.post(
440 new FlowRuleEvent(
441 FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
442 op.getTarget()));
443 break;
444 case MODIFY:
445 //TODO: do something here when the time comes.
446 break;
447 default:
448 log.warn("Unknown flow operation operator: {}", op.getOperator());
449 }
450 }
451 );
452
453 DeviceId deviceId = event.deviceId();
454
455 FlowRuleBatchOperation batchOperation =
456 request.asBatchOperation(deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700457
Madan Jampani117aaae2014-10-23 10:04:05 -0700458 FlowRuleProvider flowRuleProvider =
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800459 getProvider(deviceId);
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800460
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800461 flowRuleProvider.executeBatch(batchOperation);
462
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800463 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700464
Madan Jampani117aaae2014-10-23 10:04:05 -0700465 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800466
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800467 FlowOperationsProcessor fops = pendingFlowOperations.remove(
468 event.subject().batchId());
469 if (event.result().isSuccess()) {
470 if (fops != null) {
471 fops.satisfy(event.deviceId());
472 }
473 } else {
474 fops.fail(event.deviceId(), event.result().failedItems());
475 }
476
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700477 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800478
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700479 default:
480 break;
481 }
tomc78acee2014-09-24 15:16:55 -0700482 }
483 }
alshabib902d41b2014-10-07 16:52:05 -0700484
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800485 private class FlowOperationsProcessor implements Runnable {
alshabib902d41b2014-10-07 16:52:05 -0700486
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800487 private final List<Set<FlowRuleOperation>> stages;
488 private final FlowRuleOperationsContext context;
489 private final FlowRuleOperations fops;
490 private final AtomicBoolean hasFailed = new AtomicBoolean(false);
alshabib902d41b2014-10-07 16:52:05 -0700491
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800492 private Set<DeviceId> pendingDevices;
alshabib902d41b2014-10-07 16:52:05 -0700493
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800494 public FlowOperationsProcessor(FlowRuleOperations ops) {
alshabib902d41b2014-10-07 16:52:05 -0700495
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800496 this.stages = Lists.newArrayList(ops.stages());
497 this.context = ops.callback();
498 this.fops = ops;
499 pendingDevices = Sets.newConcurrentHashSet();
alshabib902d41b2014-10-07 16:52:05 -0700500
alshabib193525b2014-10-08 18:58:03 -0700501
alshabib902d41b2014-10-07 16:52:05 -0700502 }
503
504 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800505 public void run() {
506 if (stages.size() > 0) {
507 process(stages.remove(0));
508 } else if (!hasFailed.get() && context != null) {
509 context.onSuccess(fops);
alshabib193525b2014-10-08 18:58:03 -0700510 }
511 }
512
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800513 private void process(Set<FlowRuleOperation> ops) {
514 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
515 ArrayListMultimap.create();
516
517 FlowRuleBatchEntry fbe;
518 for (FlowRuleOperation flowRuleOperation : ops) {
519 switch (flowRuleOperation.type()) {
520 // FIXME: Brian needs imagination when creating class names.
521 case ADD:
522 fbe = new FlowRuleBatchEntry(
523 FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
524 break;
525 case MODIFY:
526 fbe = new FlowRuleBatchEntry(
527 FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
528 break;
529 case REMOVE:
530 fbe = new FlowRuleBatchEntry(
531 FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
532 break;
533 default:
534 throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
535 }
536 pendingDevices.add(flowRuleOperation.rule().deviceId());
537 perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
538 }
539
540
541 for (DeviceId deviceId : perDeviceBatches.keySet()) {
542 Long id = idGenerator.getNewId();
543 final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
544 deviceId, id);
545 pendingFlowOperations.put(id, this);
546 deviceInstallers.submit(new Runnable() {
547 @Override
548 public void run() {
549 store.storeBatch(b);
alshabib193525b2014-10-08 18:58:03 -0700550 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800551 });
alshabib193525b2014-10-08 18:58:03 -0700552 }
553 }
554
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800555 public void satisfy(DeviceId devId) {
556 pendingDevices.remove(devId);
557 if (pendingDevices.isEmpty()) {
558 operationsService.submit(this);
alshabib193525b2014-10-08 18:58:03 -0700559 }
alshabib193525b2014-10-08 18:58:03 -0700560 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800561
562
563
564 public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
565 hasFailed.set(true);
566 pendingDevices.remove(devId);
567 if (pendingDevices.isEmpty()) {
568 operationsService.submit(this);
569 }
570
571 if (context != null) {
572 final FlowRuleOperations.Builder failedOpsBuilder =
573 FlowRuleOperations.builder();
574 failures.stream().forEach(failedOpsBuilder::add);
575
576 context.onError(failedOpsBuilder.build());
577 }
578 }
579
alshabib902d41b2014-10-07 16:52:05 -0700580 }
alshabib57044ba2014-09-16 15:58:01 -0700581}