blob: bfdf86afe86e87a2611f10f2fcfba72297c93552 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.net.flow.impl;
alshabib57044ba2014-09-16 15:58:01 -070017
Brian O'Connord12267c2015-02-17 18:17:08 -080018import com.google.common.collect.ArrayListMultimap;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
alshabib57044ba2014-09-16 15:58:01 -070024import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.core.ApplicationId;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080031import org.onosproject.core.CoreService;
32import org.onosproject.core.IdGenerator;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.event.AbstractListenerRegistry;
34import org.onosproject.event.EventDeliveryService;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.device.DeviceService;
38import org.onosproject.net.flow.CompletedBatchOperation;
Brian O'Connorabafb502014-12-02 22:26:20 -080039import org.onosproject.net.flow.FlowEntry;
40import org.onosproject.net.flow.FlowRule;
41import org.onosproject.net.flow.FlowRuleBatchEntry;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.flow.FlowRuleBatchEvent;
43import org.onosproject.net.flow.FlowRuleBatchOperation;
44import org.onosproject.net.flow.FlowRuleBatchRequest;
45import org.onosproject.net.flow.FlowRuleEvent;
46import org.onosproject.net.flow.FlowRuleListener;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080047import org.onosproject.net.flow.FlowRuleOperation;
48import org.onosproject.net.flow.FlowRuleOperations;
49import org.onosproject.net.flow.FlowRuleOperationsContext;
Brian O'Connorabafb502014-12-02 22:26:20 -080050import org.onosproject.net.flow.FlowRuleProvider;
51import org.onosproject.net.flow.FlowRuleProviderRegistry;
52import org.onosproject.net.flow.FlowRuleProviderService;
53import org.onosproject.net.flow.FlowRuleService;
54import org.onosproject.net.flow.FlowRuleStore;
55import org.onosproject.net.flow.FlowRuleStoreDelegate;
56import org.onosproject.net.provider.AbstractProviderRegistry;
57import org.onosproject.net.provider.AbstractProviderService;
alshabib57044ba2014-09-16 15:58:01 -070058import org.slf4j.Logger;
59
Brian O'Connord12267c2015-02-17 18:17:08 -080060import java.util.Collections;
61import java.util.List;
62import java.util.Map;
63import java.util.Set;
64import java.util.concurrent.ConcurrentHashMap;
65import java.util.concurrent.ExecutorService;
66import java.util.concurrent.Executors;
67import java.util.concurrent.atomic.AtomicBoolean;
Brian O'Connor72cb19a2015-01-16 16:14:41 -080068
Thomas Vachuska9b2da212014-11-10 19:30:25 -080069import static com.google.common.base.Preconditions.checkNotNull;
Brian O'Connord12267c2015-02-17 18:17:08 -080070import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska9b2da212014-11-10 19:30:25 -080071import static org.slf4j.LoggerFactory.getLogger;
alshabiba7f7ca82014-09-22 11:41:23 -070072
tome4729872014-09-23 00:37:37 -070073/**
74 * Provides implementation of the flow NB & SB APIs.
75 */
Brian O'Connord12267c2015-02-17 18:17:08 -080076@Component(immediate = true, enabled = true)
alshabib57044ba2014-09-16 15:58:01 -070077@Service
tom202175a2014-09-19 19:00:11 -070078public class FlowRuleManager
tom9b4030d2014-10-06 10:39:03 -070079 extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
80 implements FlowRuleService, FlowRuleProviderRegistry {
alshabib57044ba2014-09-16 15:58:01 -070081
Sho SHIMIZU183b12fd2015-01-20 14:56:39 -080082 enum BatchState { STARTED, FINISHED, CANCELLED }
alshabib193525b2014-10-08 18:58:03 -070083
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -070084 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
alshabib57044ba2014-09-16 15:58:01 -070085 private final Logger log = getLogger(getClass());
86
87 private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
tom9b4030d2014-10-06 10:39:03 -070088 listenerRegistry = new AbstractListenerRegistry<>();
alshabib57044ba2014-09-16 15:58:01 -070089
alshabibbb42cad2014-09-25 11:43:05 -070090 private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
tomc78acee2014-09-24 15:16:55 -070091
Brian O'Connor72cb19a2015-01-16 16:14:41 -080092 protected ExecutorService deviceInstallers =
Brian O'Connord12267c2015-02-17 18:17:08 -080093 Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d"));
Brian O'Connor72cb19a2015-01-16 16:14:41 -080094
95 protected ExecutorService operationsService =
Brian O'Connord12267c2015-02-17 18:17:08 -080096 Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d"));
Brian O'Connor72cb19a2015-01-16 16:14:41 -080097
98 private IdGenerator idGenerator;
99
Brian O'Connord12267c2015-02-17 18:17:08 -0800100 private Map<Long, FlowOperationsProcessor> pendingFlowOperations
101 = new ConcurrentHashMap<>();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700102
tombe988312014-09-19 18:38:47 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected FlowRuleStore store;
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700105
alshabib57044ba2014-09-16 15:58:01 -0700106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700107 protected EventDeliveryService eventDispatcher;
alshabib57044ba2014-09-16 15:58:01 -0700108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Ayaka Koshibeb55524f2014-09-18 09:59:24 -0700110 protected DeviceService deviceService;
alshabib57044ba2014-09-16 15:58:01 -0700111
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected CoreService coreService;
114
alshabib57044ba2014-09-16 15:58:01 -0700115 @Activate
116 public void activate() {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800117
118 idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
119
120
tomc78acee2014-09-24 15:16:55 -0700121 store.setDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700122 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
123 log.info("Started");
124 }
125
126 @Deactivate
127 public void deactivate() {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800128 deviceInstallers.shutdownNow();
129 operationsService.shutdownNow();
tomc78acee2014-09-24 15:16:55 -0700130 store.unsetDelegate(delegate);
alshabib57044ba2014-09-16 15:58:01 -0700131 eventDispatcher.removeSink(FlowRuleEvent.class);
132 log.info("Stopped");
133 }
134
135 @Override
tom9b4030d2014-10-06 10:39:03 -0700136 public int getFlowRuleCount() {
137 return store.getFlowRuleCount();
138 }
139
140 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700141 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700142 return store.getFlowEntries(deviceId);
alshabib57044ba2014-09-16 15:58:01 -0700143 }
144
145 @Override
alshabib219ebaa2014-09-22 15:41:24 -0700146 public void applyFlowRules(FlowRule... flowRules) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800147 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
alshabib57044ba2014-09-16 15:58:01 -0700148 for (int i = 0; i < flowRules.length; i++) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800149 builder.add(flowRules[i]);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700150 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800151 apply(builder.build());
alshabib57044ba2014-09-16 15:58:01 -0700152 }
153
154 @Override
155 public void removeFlowRules(FlowRule... flowRules) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800156 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
alshabib57044ba2014-09-16 15:58:01 -0700157 for (int i = 0; i < flowRules.length; i++) {
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800158 builder.remove(flowRules[i]);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700159 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800160 apply(builder.build());
alshabiba68eb962014-09-24 20:34:13 -0700161 }
alshabib57044ba2014-09-16 15:58:01 -0700162
alshabiba68eb962014-09-24 20:34:13 -0700163 @Override
164 public void removeFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700165 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
alshabiba68eb962014-09-24 20:34:13 -0700166 }
167
168 @Override
169 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
Madan Jampani6a456162014-10-24 11:36:17 -0700170 Set<FlowRule> flowEntries = Sets.newHashSet();
171 for (Device d : deviceService.getDevices()) {
172 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
173 if (flowEntry.appId() == id.id()) {
174 flowEntries.add(flowEntry);
175 }
176 }
177 }
178 return flowEntries;
alshabib57044ba2014-09-16 15:58:01 -0700179 }
180
181 @Override
alshabibaa7e7de2014-11-12 19:20:44 -0800182 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
183 Set<FlowRule> matches = Sets.newHashSet();
184 long toLookUp = ((long) appId.id() << 16) | groupId;
185 for (Device d : deviceService.getDevices()) {
186 for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
187 if ((flowEntry.id().value() >>> 32) == toLookUp) {
188 matches.add(flowEntry);
189 }
190 }
191 }
192 return matches;
193 }
194
195 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800196 public void apply(FlowRuleOperations ops) {
197 operationsService.submit(new FlowOperationsProcessor(ops));
alshabib902d41b2014-10-07 16:52:05 -0700198 }
199
200 @Override
alshabib57044ba2014-09-16 15:58:01 -0700201 public void addListener(FlowRuleListener listener) {
202 listenerRegistry.addListener(listener);
203 }
204
205 @Override
206 public void removeListener(FlowRuleListener listener) {
207 listenerRegistry.removeListener(listener);
208 }
209
210 @Override
211 protected FlowRuleProviderService createProviderService(
212 FlowRuleProvider provider) {
213 return new InternalFlowRuleProviderService(provider);
214 }
215
216 private class InternalFlowRuleProviderService
tom9b4030d2014-10-06 10:39:03 -0700217 extends AbstractProviderService<FlowRuleProvider>
218 implements FlowRuleProviderService {
alshabib57044ba2014-09-16 15:58:01 -0700219
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700220 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
221
alshabib57044ba2014-09-16 15:58:01 -0700222 protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
223 super(provider);
224 }
225
226 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700227 public void flowRemoved(FlowEntry flowEntry) {
228 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700229 checkValidity();
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700230 lastSeen.remove(flowEntry);
alshabib1c319ff2014-10-04 20:29:09 -0700231 FlowEntry stored = store.getFlowEntry(flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700232 if (stored == null) {
Yuta HIGUCHI82e53262014-11-27 10:28:51 -0800233 log.debug("Rule already evicted from store: {}", flowEntry);
alshabiba68eb962014-09-24 20:34:13 -0700234 return;
235 }
alshabib1c319ff2014-10-04 20:29:09 -0700236 Device device = deviceService.getDevice(flowEntry.deviceId());
alshabiba68eb962014-09-24 20:34:13 -0700237 FlowRuleProvider frp = getProvider(device.providerId());
238 FlowRuleEvent event = null;
239 switch (stored.state()) {
tom9b4030d2014-10-06 10:39:03 -0700240 case ADDED:
241 case PENDING_ADD:
alshabib6eb438a2014-10-01 16:39:37 -0700242 frp.applyFlowRule(stored);
tom9b4030d2014-10-06 10:39:03 -0700243 break;
244 case PENDING_REMOVE:
245 case REMOVED:
246 event = store.removeFlowRule(stored);
247 break;
248 default:
249 break;
alshabib57044ba2014-09-16 15:58:01 -0700250
alshabiba68eb962014-09-24 20:34:13 -0700251 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700252 if (event != null) {
alshabib1c319ff2014-10-04 20:29:09 -0700253 log.debug("Flow {} removed", flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700254 post(event);
255 }
alshabib57044ba2014-09-16 15:58:01 -0700256 }
257
alshabibba5ac482014-10-02 17:15:20 -0700258
alshabib1c319ff2014-10-04 20:29:09 -0700259 private void flowMissing(FlowEntry flowRule) {
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700260 checkNotNull(flowRule, FLOW_RULE_NULL);
261 checkValidity();
alshabiba68eb962014-09-24 20:34:13 -0700262 Device device = deviceService.getDevice(flowRule.deviceId());
263 FlowRuleProvider frp = getProvider(device.providerId());
alshabibbb42cad2014-09-25 11:43:05 -0700264 FlowRuleEvent event = null;
alshabiba68eb962014-09-24 20:34:13 -0700265 switch (flowRule.state()) {
tom9b4030d2014-10-06 10:39:03 -0700266 case PENDING_REMOVE:
267 case REMOVED:
268 event = store.removeFlowRule(flowRule);
269 frp.removeFlowRule(flowRule);
270 break;
271 case ADDED:
272 case PENDING_ADD:
273 frp.applyFlowRule(flowRule);
274 break;
275 default:
276 log.debug("Flow {} has not been installed.", flowRule);
alshabiba68eb962014-09-24 20:34:13 -0700277 }
278
alshabibbb42cad2014-09-25 11:43:05 -0700279 if (event != null) {
280 log.debug("Flow {} removed", flowRule);
281 post(event);
282 }
alshabib57044ba2014-09-16 15:58:01 -0700283
284 }
285
alshabibba5ac482014-10-02 17:15:20 -0700286
287 private void extraneousFlow(FlowRule flowRule) {
alshabib219ebaa2014-09-22 15:41:24 -0700288 checkNotNull(flowRule, FLOW_RULE_NULL);
289 checkValidity();
alshabib2374fc92014-10-22 11:03:23 -0700290 FlowRuleProvider frp = getProvider(flowRule.deviceId());
291 frp.removeFlowRule(flowRule);
alshabib54ce5892014-09-23 17:50:51 -0700292 log.debug("Flow {} is on switch but not in store.", flowRule);
alshabib219ebaa2014-09-22 15:41:24 -0700293 }
294
alshabibba5ac482014-10-02 17:15:20 -0700295
alshabib1c319ff2014-10-04 20:29:09 -0700296 private void flowAdded(FlowEntry flowEntry) {
297 checkNotNull(flowEntry, FLOW_RULE_NULL);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700298 checkValidity();
alshabib57044ba2014-09-16 15:58:01 -0700299
alshabib1c319ff2014-10-04 20:29:09 -0700300 if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
alshabibba5ac482014-10-02 17:15:20 -0700301
alshabib1c319ff2014-10-04 20:29:09 -0700302 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
alshabibba5ac482014-10-02 17:15:20 -0700303 if (event == null) {
304 log.debug("No flow store event generated.");
305 } else {
Jonathan Hart58682dd2014-11-24 20:11:16 -0800306 log.trace("Flow {} {}", flowEntry, event.type());
alshabibba5ac482014-10-02 17:15:20 -0700307 post(event);
308 }
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700309 } else {
Thomas Vachuska4830d392014-11-09 17:09:56 -0800310 log.debug("Removing flow rules....");
alshabib1c319ff2014-10-04 20:29:09 -0700311 removeFlowRules(flowEntry);
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700312 }
alshabib219ebaa2014-09-22 15:41:24 -0700313
alshabib57044ba2014-09-16 15:58:01 -0700314 }
315
alshabib1c319ff2014-10-04 20:29:09 -0700316 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
317 if (storedRule == null) {
318 return false;
319 }
Jonathan Hartbc4a7932014-10-21 11:46:00 -0700320 if (storedRule.isPermanent()) {
321 return true;
322 }
323
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700324 final long timeout = storedRule.timeout() * 1000;
325 final long currentTime = System.currentTimeMillis();
alshabib85c41972014-10-03 13:48:39 -0700326 if (storedRule.packets() != swRule.packets()) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700327 lastSeen.put(storedRule, currentTime);
alshabib85c41972014-10-03 13:48:39 -0700328 return true;
329 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700330 if (!lastSeen.containsKey(storedRule)) {
331 // checking for the first time
332 lastSeen.put(storedRule, storedRule.lastSeen());
333 // Use following if lastSeen attr. was removed.
334 //lastSeen.put(storedRule, currentTime);
335 }
336 Long last = lastSeen.get(storedRule);
337 if (last == null) {
338 // concurrently removed? let the liveness check fail
339 return false;
340 }
alshabib85c41972014-10-03 13:48:39 -0700341
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700342 if ((currentTime - last) <= timeout) {
alshabibc274c902014-10-03 14:58:27 -0700343 return true;
344 }
345 return false;
alshabibba5ac482014-10-02 17:15:20 -0700346 }
347
Ayaka Koshibe08eabaa2014-09-17 14:59:25 -0700348 // Posts the specified event to the local event dispatcher.
349 private void post(FlowRuleEvent event) {
350 if (event != null) {
351 eventDispatcher.post(event);
352 }
353 }
alshabib5c370ff2014-09-18 10:12:14 -0700354
355 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700356 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
alshabib64def642014-12-02 23:27:37 -0800357 Set<FlowEntry> storedRules = Sets.newHashSet(store.getFlowEntries(deviceId));
Saurav Dasfa2fa932015-03-03 11:29:48 -0800358 for (FlowEntry rule : flowEntries) {
359 try {
360 if (storedRules.remove(rule)) {
361 // we both have the rule, let's update some info then.
362 flowAdded(rule);
363 } else {
364 // the device has a rule the store does not have
365 extraneousFlow(rule);
alshabib93cb57f2015-02-12 17:43:26 -0800366 }
Saurav Dasfa2fa932015-03-03 11:29:48 -0800367 } catch (Throwable e) {
368 log.debug("Can't process added or extra rule {}", e.getMessage());
369 continue;
alshabib93cb57f2015-02-12 17:43:26 -0800370 }
Saurav Dasfa2fa932015-03-03 11:29:48 -0800371 }
372 for (FlowEntry rule : storedRules) {
373 try {
374 // there are rules in the store that aren't on the switch
375 flowMissing(rule);
376 } catch (Throwable e) {
377 log.debug("Can't add missing flow rule {}", e.getMessage());
378 continue;
alshabib93cb57f2015-02-12 17:43:26 -0800379 }
Saurav Dasfa2fa932015-03-03 11:29:48 -0800380 }
alshabib93cb57f2015-02-12 17:43:26 -0800381
alshabib5c370ff2014-09-18 10:12:14 -0700382 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800383
384 @Override
385 public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
386 store.batchOperationComplete(FlowRuleBatchEvent.completed(
387 new FlowRuleBatchRequest(batchId, Collections.emptySet()),
388 operation
389 ));
390 }
alshabib57044ba2014-09-16 15:58:01 -0700391 }
392
tomc78acee2014-09-24 15:16:55 -0700393 // Store delegate to re-post events emitted from the store.
394 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800395
Thomas Vachuska9b2da212014-11-10 19:30:25 -0800396
Madan Jampani117aaae2014-10-23 10:04:05 -0700397 // TODO: Right now we only dispatch events at individual flowEntry level.
398 // It may be more efficient for also dispatch events as a batch.
tomc78acee2014-09-24 15:16:55 -0700399 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700400 public void notify(FlowRuleBatchEvent event) {
401 final FlowRuleBatchRequest request = event.subject();
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700402 switch (event.type()) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700403 case BATCH_OPERATION_REQUESTED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800404 // Request has been forwarded to MASTER Node, and was
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800405 request.ops().stream().forEach(
406 op -> {
Ray Milkeyf7329c72015-02-17 11:37:01 -0800407 switch (op.operator()) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700408
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800409 case ADD:
410 eventDispatcher.post(
411 new FlowRuleEvent(
412 FlowRuleEvent.Type.RULE_ADD_REQUESTED,
Ray Milkeyf7329c72015-02-17 11:37:01 -0800413 op.target()));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800414 break;
415 case REMOVE:
416 eventDispatcher.post(
417 new FlowRuleEvent(
418 FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
Ray Milkeyf7329c72015-02-17 11:37:01 -0800419 op.target()));
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800420 break;
421 case MODIFY:
422 //TODO: do something here when the time comes.
423 break;
424 default:
Ray Milkeyf7329c72015-02-17 11:37:01 -0800425 log.warn("Unknown flow operation operator: {}", op.operator());
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800426 }
427 }
428 );
429
430 DeviceId deviceId = event.deviceId();
431
432 FlowRuleBatchOperation batchOperation =
433 request.asBatchOperation(deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700434
Madan Jampani117aaae2014-10-23 10:04:05 -0700435 FlowRuleProvider flowRuleProvider =
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800436 getProvider(deviceId);
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800437
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800438 flowRuleProvider.executeBatch(batchOperation);
439
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800440 break;
Madan Jampani117aaae2014-10-23 10:04:05 -0700441
Madan Jampani117aaae2014-10-23 10:04:05 -0700442 case BATCH_OPERATION_COMPLETED:
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800443
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800444 FlowOperationsProcessor fops = pendingFlowOperations.remove(
445 event.subject().batchId());
446 if (event.result().isSuccess()) {
447 if (fops != null) {
448 fops.satisfy(event.deviceId());
449 }
450 } else {
451 fops.fail(event.deviceId(), event.result().failedItems());
452 }
453
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700454 break;
Yuta HIGUCHI2fcb40c2014-11-03 14:39:10 -0800455
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700456 default:
457 break;
458 }
tomc78acee2014-09-24 15:16:55 -0700459 }
460 }
alshabib902d41b2014-10-07 16:52:05 -0700461
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800462 private class FlowOperationsProcessor implements Runnable {
alshabib902d41b2014-10-07 16:52:05 -0700463
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800464 private final List<Set<FlowRuleOperation>> stages;
465 private final FlowRuleOperationsContext context;
466 private final FlowRuleOperations fops;
467 private final AtomicBoolean hasFailed = new AtomicBoolean(false);
alshabib902d41b2014-10-07 16:52:05 -0700468
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800469 private Set<DeviceId> pendingDevices;
alshabib902d41b2014-10-07 16:52:05 -0700470
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800471 public FlowOperationsProcessor(FlowRuleOperations ops) {
alshabib902d41b2014-10-07 16:52:05 -0700472
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800473 this.stages = Lists.newArrayList(ops.stages());
474 this.context = ops.callback();
475 this.fops = ops;
476 pendingDevices = Sets.newConcurrentHashSet();
alshabib902d41b2014-10-07 16:52:05 -0700477
alshabib193525b2014-10-08 18:58:03 -0700478
alshabib902d41b2014-10-07 16:52:05 -0700479 }
480
481 @Override
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800482 public void run() {
483 if (stages.size() > 0) {
484 process(stages.remove(0));
485 } else if (!hasFailed.get() && context != null) {
486 context.onSuccess(fops);
alshabib193525b2014-10-08 18:58:03 -0700487 }
488 }
489
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800490 private void process(Set<FlowRuleOperation> ops) {
491 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
492 ArrayListMultimap.create();
493
494 FlowRuleBatchEntry fbe;
495 for (FlowRuleOperation flowRuleOperation : ops) {
496 switch (flowRuleOperation.type()) {
497 // FIXME: Brian needs imagination when creating class names.
498 case ADD:
499 fbe = new FlowRuleBatchEntry(
500 FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
501 break;
502 case MODIFY:
503 fbe = new FlowRuleBatchEntry(
504 FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
505 break;
506 case REMOVE:
507 fbe = new FlowRuleBatchEntry(
508 FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
509 break;
510 default:
511 throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
512 }
513 pendingDevices.add(flowRuleOperation.rule().deviceId());
514 perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
515 }
516
517
518 for (DeviceId deviceId : perDeviceBatches.keySet()) {
519 Long id = idGenerator.getNewId();
520 final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
521 deviceId, id);
522 pendingFlowOperations.put(id, this);
523 deviceInstallers.submit(new Runnable() {
524 @Override
525 public void run() {
526 store.storeBatch(b);
alshabib193525b2014-10-08 18:58:03 -0700527 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800528 });
alshabib193525b2014-10-08 18:58:03 -0700529 }
530 }
531
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800532 public void satisfy(DeviceId devId) {
533 pendingDevices.remove(devId);
534 if (pendingDevices.isEmpty()) {
535 operationsService.submit(this);
alshabib193525b2014-10-08 18:58:03 -0700536 }
alshabib193525b2014-10-08 18:58:03 -0700537 }
Brian O'Connor72cb19a2015-01-16 16:14:41 -0800538
539
540
541 public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
542 hasFailed.set(true);
543 pendingDevices.remove(devId);
544 if (pendingDevices.isEmpty()) {
545 operationsService.submit(this);
546 }
547
548 if (context != null) {
549 final FlowRuleOperations.Builder failedOpsBuilder =
550 FlowRuleOperations.builder();
551 failures.stream().forEach(failedOpsBuilder::add);
552
553 context.onError(failedOpsBuilder.build());
554 }
555 }
556
alshabib902d41b2014-10-07 16:52:05 -0700557 }
alshabib57044ba2014-09-16 15:58:01 -0700558}