blob: 012f3b6dc1dfca9e0b20728b1755d5fd95722614 [file] [log] [blame]
Claudine Chiu70222ad2016-11-17 22:29:20 -05001/*
Thomas Vachuska52f2cd12018-11-08 21:20:04 -08002 * Copyright 2018-present Open Networking Foundation
Claudine Chiu70222ad2016-11-17 22:29:20 -05003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
yoonseon6b972c32016-12-06 16:45:03 -080019import com.google.common.collect.ArrayListMultimap;
20import com.google.common.collect.Iterables;
21import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Multimap;
24import com.google.common.collect.Sets;
Claudine Chiu70222ad2016-11-17 22:29:20 -050025import org.onosproject.core.ApplicationId;
yoonseon6b972c32016-12-06 16:45:03 -080026import org.onosproject.core.CoreService;
27import org.onosproject.core.IdGenerator;
yoonseonc6a69272017-01-12 18:22:20 -080028import org.onosproject.incubator.net.virtual.NetworkId;
yoonseon6b972c32016-12-06 16:45:03 -080029import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
yoonseonc6a69272017-01-12 18:22:20 -080030import org.onosproject.incubator.net.virtual.VirtualNetworkService;
31import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
yoonseon6b972c32016-12-06 16:45:03 -080032import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProviderService;
33import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
34import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProviderService;
35import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
36import org.onosproject.net.Device;
Claudine Chiu70222ad2016-11-17 22:29:20 -050037import org.onosproject.net.DeviceId;
yoonseon6b972c32016-12-06 16:45:03 -080038import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.flow.CompletedBatchOperation;
40import org.onosproject.net.flow.DefaultFlowEntry;
Claudine Chiu70222ad2016-11-17 22:29:20 -050041import org.onosproject.net.flow.FlowEntry;
42import org.onosproject.net.flow.FlowRule;
Ray Milkey7bf273c2017-09-27 16:15:15 -070043import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
44import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
45import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
46import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Claudine Chiu70222ad2016-11-17 22:29:20 -050047import org.onosproject.net.flow.FlowRuleEvent;
48import org.onosproject.net.flow.FlowRuleListener;
yoonseon6b972c32016-12-06 16:45:03 -080049import org.onosproject.net.flow.FlowRuleOperation;
Claudine Chiu70222ad2016-11-17 22:29:20 -050050import org.onosproject.net.flow.FlowRuleOperations;
51import org.onosproject.net.flow.FlowRuleService;
yoonseonbd8a93d2016-12-07 15:51:21 -080052import org.onosproject.net.flow.FlowRuleStoreDelegate;
Claudine Chiu70222ad2016-11-17 22:29:20 -050053import org.onosproject.net.flow.TableStatisticsEntry;
yoonseonbd8a93d2016-12-07 15:51:21 -080054import org.onosproject.net.provider.ProviderId;
yoonseon6b972c32016-12-06 16:45:03 -080055import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.Collections;
59import java.util.HashSet;
60import java.util.List;
61import java.util.Map;
62import java.util.Set;
63import java.util.concurrent.ConcurrentHashMap;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
Claudine Chiu70222ad2016-11-17 22:29:20 -050066
67import static com.google.common.base.Preconditions.checkNotNull;
yoonseon6b972c32016-12-06 16:45:03 -080068import static org.onlab.util.Tools.groupedThreads;
yoonseonbd8a93d2016-12-07 15:51:21 -080069import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_ADD_REQUESTED;
70import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVE_REQUESTED;
Claudine Chiu70222ad2016-11-17 22:29:20 -050071
72/**
73 * Flow rule service implementation built on the virtual network service.
74 */
yoonseon6b972c32016-12-06 16:45:03 -080075public class VirtualNetworkFlowRuleManager
yoonseonc6a69272017-01-12 18:22:20 -080076 extends AbstractVirtualListenerManager<FlowRuleEvent, FlowRuleListener>
77 implements FlowRuleService {
Claudine Chiu70222ad2016-11-17 22:29:20 -050078
yoonseon6b972c32016-12-06 16:45:03 -080079 private static final String VIRTUAL_FLOW_OP_TOPIC = "virtual-flow-ops-ids";
80 private static final String THREAD_GROUP_NAME = "onos/virtual-flowservice";
81 private static final String DEVICE_INSTALLER_PATTERN = "device-installer-%d";
82 private static final String OPERATION_PATTERN = "operations-%d";
83 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
84
85 private final Logger log = LoggerFactory.getLogger(getClass());
86
yoonseon6b972c32016-12-06 16:45:03 -080087 private final VirtualNetworkFlowRuleStore store;
88 private final DeviceService deviceService;
89
90 protected ExecutorService deviceInstallers =
91 Executors.newFixedThreadPool(32,
92 groupedThreads(THREAD_GROUP_NAME,
93 DEVICE_INSTALLER_PATTERN, log));
94 protected ExecutorService operationsService =
95 Executors.newFixedThreadPool(32,
96 groupedThreads(THREAD_GROUP_NAME,
97 OPERATION_PATTERN, log));
98 private IdGenerator idGenerator;
99
100 private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
101
102 private VirtualProviderRegistryService providerRegistryService = null;
103 private InternalFlowRuleProviderService innerProviderService = null;
104
yoonseonbd8a93d2016-12-07 15:51:21 -0800105 private final FlowRuleStoreDelegate storeDelegate;
106
Claudine Chiu70222ad2016-11-17 22:29:20 -0500107 /**
108 * Creates a new VirtualNetworkFlowRuleService object.
109 *
110 * @param virtualNetworkManager virtual network manager service
yoonseonc6a69272017-01-12 18:22:20 -0800111 * @param networkId a virtual network identifier
Claudine Chiu70222ad2016-11-17 22:29:20 -0500112 */
yoonseonc6a69272017-01-12 18:22:20 -0800113 public VirtualNetworkFlowRuleManager(VirtualNetworkService virtualNetworkManager,
114 NetworkId networkId) {
Yoonseon Hanb14461b2017-03-07 14:08:01 +0900115 super(virtualNetworkManager, networkId, FlowRuleEvent.class);
yoonseon6b972c32016-12-06 16:45:03 -0800116
yoonseon6b972c32016-12-06 16:45:03 -0800117 store = serviceDirectory.get(VirtualNetworkFlowRuleStore.class);
yoonseonbd8a93d2016-12-07 15:51:21 -0800118
yoonseon6b972c32016-12-06 16:45:03 -0800119 idGenerator = serviceDirectory.get(CoreService.class)
yoonseonc6a69272017-01-12 18:22:20 -0800120 .getIdGenerator(VIRTUAL_FLOW_OP_TOPIC + networkId().toString());
yoonseon6b972c32016-12-06 16:45:03 -0800121 providerRegistryService =
122 serviceDirectory.get(VirtualProviderRegistryService.class);
123 innerProviderService = new InternalFlowRuleProviderService();
yoonseonc6a69272017-01-12 18:22:20 -0800124 providerRegistryService.registerProviderService(networkId(), innerProviderService);
yoonseon6b972c32016-12-06 16:45:03 -0800125
yoonseonbd8a93d2016-12-07 15:51:21 -0800126 this.deviceService = manager.get(networkId, DeviceService.class);
127 this.storeDelegate = new InternalStoreDelegate();
128 store.setDelegate(networkId, this.storeDelegate);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500129 }
130
131 @Override
132 public int getFlowRuleCount() {
yoonseonc6a69272017-01-12 18:22:20 -0800133 return store.getFlowRuleCount(networkId());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500134 }
135
136 @Override
137 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
yoonseonc6a69272017-01-12 18:22:20 -0800138 return store.getFlowEntries(networkId(), deviceId);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500139 }
140
141 @Override
142 public void applyFlowRules(FlowRule... flowRules) {
yoonseon6b972c32016-12-06 16:45:03 -0800143 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
144 for (FlowRule flowRule : flowRules) {
145 builder.add(flowRule);
146 }
147 apply(builder.build());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500148 }
149
150 @Override
151 public void purgeFlowRules(DeviceId deviceId) {
yoonseonc6a69272017-01-12 18:22:20 -0800152 store.purgeFlowRule(networkId(), deviceId);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500153 }
154
155 @Override
156 public void removeFlowRules(FlowRule... flowRules) {
yoonseon6b972c32016-12-06 16:45:03 -0800157 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
158 for (FlowRule flowRule : flowRules) {
159 builder.remove(flowRule);
160 }
161 apply(builder.build());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500162 }
163
164 @Override
yoonseon6b972c32016-12-06 16:45:03 -0800165 public void removeFlowRulesById(ApplicationId id) {
yoonseon6b972c32016-12-06 16:45:03 -0800166 Set<FlowRule> flowEntries = Sets.newHashSet();
167 for (Device d : deviceService.getDevices()) {
yoonseonc6a69272017-01-12 18:22:20 -0800168 for (FlowEntry flowEntry : store.getFlowEntries(networkId(), d.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800169 if (flowEntry.appId() == id.id()) {
170 flowEntries.add(flowEntry);
171 }
172 }
173 }
Ray Milkey4f7e3632019-02-19 15:35:20 -0800174 removeFlowRules(Iterables.toArray(flowEntries, FlowRule.class));
Claudine Chiu70222ad2016-11-17 22:29:20 -0500175 }
176
177 @Override
178 public Iterable<FlowEntry> getFlowEntriesById(ApplicationId id) {
yoonseonc6a69272017-01-12 18:22:20 -0800179 DeviceService deviceService = manager.get(networkId(), DeviceService.class);
yoonseon6b972c32016-12-06 16:45:03 -0800180
181 Set<FlowEntry> flowEntries = Sets.newHashSet();
182 for (Device d : deviceService.getDevices()) {
yoonseonc6a69272017-01-12 18:22:20 -0800183 for (FlowEntry flowEntry : store.getFlowEntries(networkId(), d.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800184 if (flowEntry.appId() == id.id()) {
185 flowEntries.add(flowEntry);
186 }
187 }
188 }
189 return flowEntries;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500190 }
191
192 @Override
193 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
yoonseonc6a69272017-01-12 18:22:20 -0800194 DeviceService deviceService = manager.get(networkId(), DeviceService.class);
yoonseon6b972c32016-12-06 16:45:03 -0800195
196 Set<FlowRule> matches = Sets.newHashSet();
197 long toLookUp = ((long) appId.id() << 16) | groupId;
198 for (Device d : deviceService.getDevices()) {
yoonseonc6a69272017-01-12 18:22:20 -0800199 for (FlowEntry flowEntry : store.getFlowEntries(networkId(), d.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800200 if ((flowEntry.id().value() >>> 32) == toLookUp) {
201 matches.add(flowEntry);
202 }
203 }
204 }
205 return matches;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500206 }
207
208 @Override
209 public void apply(FlowRuleOperations ops) {
yoonseon6b972c32016-12-06 16:45:03 -0800210 operationsService.execute(new FlowOperationsProcessor(ops));
Claudine Chiu70222ad2016-11-17 22:29:20 -0500211 }
212
213 @Override
214 public Iterable<TableStatisticsEntry> getFlowTableStatistics(DeviceId deviceId) {
yoonseonc6a69272017-01-12 18:22:20 -0800215 return store.getTableStatistics(networkId(), deviceId);
yoonseon6b972c32016-12-06 16:45:03 -0800216 }
217
218 private static FlowRuleBatchEntry.FlowRuleOperation mapOperationType(FlowRuleOperation.Type input) {
219 switch (input) {
220 case ADD:
221 return FlowRuleBatchEntry.FlowRuleOperation.ADD;
222 case MODIFY:
223 return FlowRuleBatchEntry.FlowRuleOperation.MODIFY;
224 case REMOVE:
225 return FlowRuleBatchEntry.FlowRuleOperation.REMOVE;
226 default:
227 throw new UnsupportedOperationException("Unknown flow rule type " + input);
228 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500229 }
230
yoonseon6b972c32016-12-06 16:45:03 -0800231 private class FlowOperationsProcessor implements Runnable {
232 // Immutable
233 private final FlowRuleOperations fops;
234
235 // Mutable
236 private final List<Set<FlowRuleOperation>> stages;
237 private final Set<DeviceId> pendingDevices = new HashSet<>();
238 private boolean hasFailed = false;
239
240 FlowOperationsProcessor(FlowRuleOperations ops) {
241 this.stages = Lists.newArrayList(ops.stages());
242 this.fops = ops;
243 }
244
245 @Override
246 public synchronized void run() {
Jon Hallcbd1b392017-01-18 20:15:44 -0800247 if (!stages.isEmpty()) {
yoonseon6b972c32016-12-06 16:45:03 -0800248 process(stages.remove(0));
249 } else if (!hasFailed) {
250 fops.callback().onSuccess(fops);
251 }
252 }
253
254 private void process(Set<FlowRuleOperation> ops) {
255 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();
256
257 for (FlowRuleOperation op : ops) {
258 perDeviceBatches.put(op.rule().deviceId(),
259 new FlowRuleBatchEntry(mapOperationType(op.type()), op.rule()));
260 }
261 pendingDevices.addAll(perDeviceBatches.keySet());
262
263 for (DeviceId deviceId : perDeviceBatches.keySet()) {
264 long id = idGenerator.getNewId();
265 final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
266 deviceId, id);
267 pendingFlowOperations.put(id, this);
yoonseonc6a69272017-01-12 18:22:20 -0800268 deviceInstallers.execute(() -> store.storeBatch(networkId(), b));
yoonseon6b972c32016-12-06 16:45:03 -0800269 }
270 }
271
272 synchronized void satisfy(DeviceId devId) {
273 pendingDevices.remove(devId);
274 if (pendingDevices.isEmpty()) {
275 operationsService.execute(this);
276 }
277 }
278
279 synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
280 hasFailed = true;
281 pendingDevices.remove(devId);
282 if (pendingDevices.isEmpty()) {
283 operationsService.execute(this);
284 }
285
286 FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
287 failures.forEach(failedOpsBuilder::add);
288
289 fops.callback().onError(failedOpsBuilder.build());
290 }
291 }
292
yoonseonbd8a93d2016-12-07 15:51:21 -0800293 private final class InternalFlowRuleProviderService
yoonseon6b972c32016-12-06 16:45:03 -0800294 extends AbstractVirtualProviderService<VirtualFlowRuleProvider>
295 implements VirtualFlowRuleProviderService {
296
297 final Map<FlowEntry, Long> firstSeen = Maps.newConcurrentMap();
298 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
299
yoonseonbd8a93d2016-12-07 15:51:21 -0800300 private InternalFlowRuleProviderService() {
301 //TODO: find a proper virtual provider.
302 Set<ProviderId> providerIds =
303 providerRegistryService.getProvidersByService(this);
304 ProviderId providerId = providerIds.stream().findFirst().get();
305 VirtualFlowRuleProvider provider = (VirtualFlowRuleProvider)
306 providerRegistryService.getProvider(providerId);
307 setProvider(provider);
308 }
309
yoonseon6b972c32016-12-06 16:45:03 -0800310 @Override
311 public void flowRemoved(FlowEntry flowEntry) {
312 checkNotNull(flowEntry, FLOW_RULE_NULL);
313 checkValidity();
314
315 lastSeen.remove(flowEntry);
316 firstSeen.remove(flowEntry);
yoonseonc6a69272017-01-12 18:22:20 -0800317 FlowEntry stored = store.getFlowEntry(networkId(), flowEntry);
yoonseon6b972c32016-12-06 16:45:03 -0800318 if (stored == null) {
319 log.debug("Rule already evicted from store: {}", flowEntry);
320 return;
321 }
322 if (flowEntry.reason() == FlowEntry.FlowRemoveReason.HARD_TIMEOUT) {
323 ((DefaultFlowEntry) stored).setState(FlowEntry.FlowEntryState.REMOVED);
324 }
yoonseon6b972c32016-12-06 16:45:03 -0800325
326 //FIXME: obtains provider from devices providerId()
327 FlowRuleEvent event = null;
328 switch (stored.state()) {
329 case ADDED:
330 case PENDING_ADD:
yoonseonc6a69272017-01-12 18:22:20 -0800331 provider().applyFlowRule(networkId(), stored);
yoonseon6b972c32016-12-06 16:45:03 -0800332 break;
333 case PENDING_REMOVE:
334 case REMOVED:
yoonseonc6a69272017-01-12 18:22:20 -0800335 event = store.removeFlowRule(networkId(), stored);
yoonseon6b972c32016-12-06 16:45:03 -0800336 break;
337 default:
338 break;
339
340 }
341 if (event != null) {
342 log.debug("Flow {} removed", flowEntry);
343 post(event);
344 }
345 }
346
yoonseon6b972c32016-12-06 16:45:03 -0800347 private void flowMissing(FlowEntry flowRule) {
348 checkNotNull(flowRule, FLOW_RULE_NULL);
349 checkValidity();
350
351 FlowRuleEvent event = null;
352 switch (flowRule.state()) {
353 case PENDING_REMOVE:
354 case REMOVED:
yoonseonc6a69272017-01-12 18:22:20 -0800355 event = store.removeFlowRule(networkId(), flowRule);
yoonseon6b972c32016-12-06 16:45:03 -0800356 break;
357 case ADDED:
358 case PENDING_ADD:
yoonseonc6a69272017-01-12 18:22:20 -0800359 event = store.pendingFlowRule(networkId(), flowRule);
yoonseonbd8a93d2016-12-07 15:51:21 -0800360
yoonseon6b972c32016-12-06 16:45:03 -0800361 try {
yoonseonc6a69272017-01-12 18:22:20 -0800362 provider().applyFlowRule(networkId(), flowRule);
yoonseon6b972c32016-12-06 16:45:03 -0800363 } catch (UnsupportedOperationException e) {
364 log.warn(e.getMessage());
365 if (flowRule instanceof DefaultFlowEntry) {
366 //FIXME modification of "stored" flow entry outside of store
367 ((DefaultFlowEntry) flowRule).setState(FlowEntry.FlowEntryState.FAILED);
368 }
369 }
370 break;
371 default:
372 log.debug("Flow {} has not been installed.", flowRule);
373 }
374
375 if (event != null) {
376 log.debug("Flow {} removed", flowRule);
377 post(event);
378 }
379 }
380
381 private void extraneousFlow(FlowRule flowRule) {
382 checkNotNull(flowRule, FLOW_RULE_NULL);
383 checkValidity();
384
yoonseonc6a69272017-01-12 18:22:20 -0800385 provider().removeFlowRule(networkId(), flowRule);
yoonseon6b972c32016-12-06 16:45:03 -0800386 log.debug("Flow {} is on switch but not in store.", flowRule);
387 }
388
389 private void flowAdded(FlowEntry flowEntry) {
390 checkNotNull(flowEntry, FLOW_RULE_NULL);
391
yoonseonc6a69272017-01-12 18:22:20 -0800392 if (checkRuleLiveness(flowEntry, store.getFlowEntry(networkId(), flowEntry))) {
393 FlowRuleEvent event = store.addOrUpdateFlowRule(networkId(), flowEntry);
yoonseon6b972c32016-12-06 16:45:03 -0800394 if (event == null) {
395 log.debug("No flow store event generated.");
396 } else {
397 log.trace("Flow {} {}", flowEntry, event.type());
398 post(event);
399 }
400 } else {
401 log.debug("Removing flow rules....");
402 removeFlowRules(flowEntry);
403 }
404 }
405
406 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
407 if (storedRule == null) {
408 return false;
409 }
410 if (storedRule.isPermanent()) {
411 return true;
412 }
413
Ray Milkey3717e602018-02-01 13:49:47 -0800414 final long timeout = storedRule.timeout() * 1000L;
yoonseon6b972c32016-12-06 16:45:03 -0800415 final long currentTime = System.currentTimeMillis();
416
417 // Checking flow with hardTimeout
418 if (storedRule.hardTimeout() != 0) {
419 if (!firstSeen.containsKey(storedRule)) {
420 // First time rule adding
421 firstSeen.put(storedRule, currentTime);
422 } else {
423 Long first = firstSeen.get(storedRule);
Ray Milkey3717e602018-02-01 13:49:47 -0800424 final long hardTimeout = storedRule.hardTimeout() * 1000L;
yoonseon6b972c32016-12-06 16:45:03 -0800425 if ((currentTime - first) > hardTimeout) {
426 return false;
427 }
428 }
429 }
430
431 if (storedRule.packets() != swRule.packets()) {
432 lastSeen.put(storedRule, currentTime);
433 return true;
434 }
435 if (!lastSeen.containsKey(storedRule)) {
436 // checking for the first time
437 lastSeen.put(storedRule, storedRule.lastSeen());
438 // Use following if lastSeen attr. was removed.
439 //lastSeen.put(storedRule, currentTime);
440 }
441 Long last = lastSeen.get(storedRule);
442
443 // concurrently removed? let the liveness check fail
444 return last != null && (currentTime - last) <= timeout;
445 }
446
447 @Override
448 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
449 pushFlowMetricsInternal(deviceId, flowEntries, true);
450 }
451
452 @Override
453 public void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
454 pushFlowMetricsInternal(deviceId, flowEntries, false);
455 }
456
457 private void pushFlowMetricsInternal(DeviceId deviceId, Iterable<FlowEntry> flowEntries,
458 boolean useMissingFlow) {
459 Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
yoonseonc6a69272017-01-12 18:22:20 -0800460 store.getFlowEntries(networkId(), deviceId).forEach(f -> storedRules.put(f, f));
yoonseon6b972c32016-12-06 16:45:03 -0800461
462 for (FlowEntry rule : flowEntries) {
463 try {
464 FlowEntry storedRule = storedRules.remove(rule);
465 if (storedRule != null) {
Yoonseon Hanc8089db2017-03-22 20:22:12 +0900466 if (storedRule.id().equals(rule.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800467 // we both have the rule, let's update some info then.
468 flowAdded(rule);
469 } else {
470 // the two rules are not an exact match - remove the
471 // switch's rule and install our rule
472 extraneousFlow(rule);
473 flowMissing(storedRule);
474 }
475 }
476 } catch (Exception e) {
477 log.debug("Can't process added or extra rule {}", e.getMessage());
478 }
479 }
480
481 // DO NOT reinstall
482 if (useMissingFlow) {
483 for (FlowEntry rule : storedRules.keySet()) {
484 try {
485 // there are rules in the store that aren't on the switch
486 log.debug("Adding rule in store, but not on switch {}", rule);
487 flowMissing(rule);
488 } catch (Exception e) {
489 log.debug("Can't add missing flow rule:", e);
490 }
491 }
492 }
493 }
494
495 public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
yoonseonc6a69272017-01-12 18:22:20 -0800496 store.batchOperationComplete(networkId(), FlowRuleBatchEvent.completed(
yoonseon6b972c32016-12-06 16:45:03 -0800497 new FlowRuleBatchRequest(batchId, Collections.emptySet()),
498 operation
499 ));
500 }
501
502 @Override
503 public void pushTableStatistics(DeviceId deviceId,
504 List<TableStatisticsEntry> tableStats) {
yoonseonc6a69272017-01-12 18:22:20 -0800505 store.updateTableStatistics(networkId(), deviceId, tableStats);
yoonseon6b972c32016-12-06 16:45:03 -0800506 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500507 }
yoonseonbd8a93d2016-12-07 15:51:21 -0800508
509 // Store delegate to re-post events emitted from the store.
510 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
511
512 // TODO: Right now we only dispatch events at individual flowEntry level.
513 // It may be more efficient for also dispatch events as a batch.
514 @Override
515 public void notify(FlowRuleBatchEvent event) {
516 final FlowRuleBatchRequest request = event.subject();
517 switch (event.type()) {
518 case BATCH_OPERATION_REQUESTED:
519 // Request has been forwarded to MASTER Node, and was
520 request.ops().forEach(
521 op -> {
522 switch (op.operator()) {
523 case ADD:
524 post(new FlowRuleEvent(RULE_ADD_REQUESTED, op.target()));
525 break;
526 case REMOVE:
527 post(new FlowRuleEvent(RULE_REMOVE_REQUESTED, op.target()));
528 break;
529 case MODIFY:
530 //TODO: do something here when the time comes.
531 break;
532 default:
533 log.warn("Unknown flow operation operator: {}", op.operator());
534 }
535 }
536 );
537
538 DeviceId deviceId = event.deviceId();
539 FlowRuleBatchOperation batchOperation = request.asBatchOperation(deviceId);
540
541 VirtualFlowRuleProvider provider = innerProviderService.provider();
542 if (provider != null) {
543 provider.executeBatch(networkId, batchOperation);
544 }
545
546 break;
547
548 case BATCH_OPERATION_COMPLETED:
Yoonseon Hanc8089db2017-03-22 20:22:12 +0900549 FlowOperationsProcessor fops = pendingFlowOperations.remove(
550 event.subject().batchId());
551 if (fops == null) {
552 return;
553 }
554
555 if (event.result().isSuccess()) {
556 fops.satisfy(event.deviceId());
557 } else {
558 fops.fail(event.deviceId(), event.result().failedItems());
559 }
yoonseonbd8a93d2016-12-07 15:51:21 -0800560 break;
561
562 default:
563 break;
564 }
565 }
566 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500567}