blob: f501696f226a8f7471fd56609464439eaa7ddc31 [file] [log] [blame]
Claudine Chiu70222ad2016-11-17 22:29:20 -05001/*
Thomas Vachuska52f2cd12018-11-08 21:20:04 -08002 * Copyright 2018-present Open Networking Foundation
Claudine Chiu70222ad2016-11-17 22:29:20 -05003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
yoonseon6b972c32016-12-06 16:45:03 -080019import com.google.common.collect.ArrayListMultimap;
20import com.google.common.collect.Iterables;
21import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Multimap;
24import com.google.common.collect.Sets;
Claudine Chiu70222ad2016-11-17 22:29:20 -050025import org.onosproject.core.ApplicationId;
yoonseon6b972c32016-12-06 16:45:03 -080026import org.onosproject.core.CoreService;
27import org.onosproject.core.IdGenerator;
yoonseonc6a69272017-01-12 18:22:20 -080028import org.onosproject.incubator.net.virtual.NetworkId;
yoonseon6b972c32016-12-06 16:45:03 -080029import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
yoonseonc6a69272017-01-12 18:22:20 -080030import org.onosproject.incubator.net.virtual.VirtualNetworkService;
31import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
yoonseon6b972c32016-12-06 16:45:03 -080032import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProviderService;
33import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
34import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProviderService;
35import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
36import org.onosproject.net.Device;
Claudine Chiu70222ad2016-11-17 22:29:20 -050037import org.onosproject.net.DeviceId;
yoonseon6b972c32016-12-06 16:45:03 -080038import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.flow.CompletedBatchOperation;
40import org.onosproject.net.flow.DefaultFlowEntry;
Claudine Chiu70222ad2016-11-17 22:29:20 -050041import org.onosproject.net.flow.FlowEntry;
42import org.onosproject.net.flow.FlowRule;
Ray Milkey7bf273c2017-09-27 16:15:15 -070043import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
44import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
45import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
46import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Claudine Chiu70222ad2016-11-17 22:29:20 -050047import org.onosproject.net.flow.FlowRuleEvent;
48import org.onosproject.net.flow.FlowRuleListener;
yoonseon6b972c32016-12-06 16:45:03 -080049import org.onosproject.net.flow.FlowRuleOperation;
Claudine Chiu70222ad2016-11-17 22:29:20 -050050import org.onosproject.net.flow.FlowRuleOperations;
51import org.onosproject.net.flow.FlowRuleService;
yoonseonbd8a93d2016-12-07 15:51:21 -080052import org.onosproject.net.flow.FlowRuleStoreDelegate;
Claudine Chiu70222ad2016-11-17 22:29:20 -050053import org.onosproject.net.flow.TableStatisticsEntry;
yoonseonbd8a93d2016-12-07 15:51:21 -080054import org.onosproject.net.provider.ProviderId;
yoonseon6b972c32016-12-06 16:45:03 -080055import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.Collections;
59import java.util.HashSet;
60import java.util.List;
61import java.util.Map;
62import java.util.Set;
63import java.util.concurrent.ConcurrentHashMap;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
Claudine Chiu70222ad2016-11-17 22:29:20 -050066
67import static com.google.common.base.Preconditions.checkNotNull;
yoonseon6b972c32016-12-06 16:45:03 -080068import static org.onlab.util.Tools.groupedThreads;
yoonseonbd8a93d2016-12-07 15:51:21 -080069import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_ADD_REQUESTED;
70import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVE_REQUESTED;
Claudine Chiu70222ad2016-11-17 22:29:20 -050071
72/**
73 * Flow rule service implementation built on the virtual network service.
74 */
yoonseon6b972c32016-12-06 16:45:03 -080075public class VirtualNetworkFlowRuleManager
yoonseonc6a69272017-01-12 18:22:20 -080076 extends AbstractVirtualListenerManager<FlowRuleEvent, FlowRuleListener>
77 implements FlowRuleService {
Claudine Chiu70222ad2016-11-17 22:29:20 -050078
yoonseon6b972c32016-12-06 16:45:03 -080079 private static final String VIRTUAL_FLOW_OP_TOPIC = "virtual-flow-ops-ids";
80 private static final String THREAD_GROUP_NAME = "onos/virtual-flowservice";
81 private static final String DEVICE_INSTALLER_PATTERN = "device-installer-%d";
82 private static final String OPERATION_PATTERN = "operations-%d";
83 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
84
85 private final Logger log = LoggerFactory.getLogger(getClass());
86
yoonseon6b972c32016-12-06 16:45:03 -080087 private final VirtualNetworkFlowRuleStore store;
88 private final DeviceService deviceService;
89
90 protected ExecutorService deviceInstallers =
91 Executors.newFixedThreadPool(32,
92 groupedThreads(THREAD_GROUP_NAME,
93 DEVICE_INSTALLER_PATTERN, log));
94 protected ExecutorService operationsService =
95 Executors.newFixedThreadPool(32,
96 groupedThreads(THREAD_GROUP_NAME,
97 OPERATION_PATTERN, log));
98 private IdGenerator idGenerator;
99
100 private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
101
102 private VirtualProviderRegistryService providerRegistryService = null;
103 private InternalFlowRuleProviderService innerProviderService = null;
104
yoonseonbd8a93d2016-12-07 15:51:21 -0800105 private final FlowRuleStoreDelegate storeDelegate;
106
Claudine Chiu70222ad2016-11-17 22:29:20 -0500107 /**
108 * Creates a new VirtualNetworkFlowRuleService object.
109 *
110 * @param virtualNetworkManager virtual network manager service
yoonseonc6a69272017-01-12 18:22:20 -0800111 * @param networkId a virtual network identifier
Claudine Chiu70222ad2016-11-17 22:29:20 -0500112 */
yoonseonc6a69272017-01-12 18:22:20 -0800113 public VirtualNetworkFlowRuleManager(VirtualNetworkService virtualNetworkManager,
114 NetworkId networkId) {
Yoonseon Hanb14461b2017-03-07 14:08:01 +0900115 super(virtualNetworkManager, networkId, FlowRuleEvent.class);
yoonseon6b972c32016-12-06 16:45:03 -0800116
yoonseon6b972c32016-12-06 16:45:03 -0800117 store = serviceDirectory.get(VirtualNetworkFlowRuleStore.class);
yoonseonbd8a93d2016-12-07 15:51:21 -0800118
yoonseon6b972c32016-12-06 16:45:03 -0800119 idGenerator = serviceDirectory.get(CoreService.class)
yoonseonc6a69272017-01-12 18:22:20 -0800120 .getIdGenerator(VIRTUAL_FLOW_OP_TOPIC + networkId().toString());
yoonseon6b972c32016-12-06 16:45:03 -0800121 providerRegistryService =
122 serviceDirectory.get(VirtualProviderRegistryService.class);
123 innerProviderService = new InternalFlowRuleProviderService();
yoonseonc6a69272017-01-12 18:22:20 -0800124 providerRegistryService.registerProviderService(networkId(), innerProviderService);
yoonseon6b972c32016-12-06 16:45:03 -0800125
yoonseonbd8a93d2016-12-07 15:51:21 -0800126 this.deviceService = manager.get(networkId, DeviceService.class);
127 this.storeDelegate = new InternalStoreDelegate();
128 store.setDelegate(networkId, this.storeDelegate);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500129 }
130
131 @Override
132 public int getFlowRuleCount() {
yoonseonc6a69272017-01-12 18:22:20 -0800133 return store.getFlowRuleCount(networkId());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500134 }
135
136 @Override
137 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
yoonseonc6a69272017-01-12 18:22:20 -0800138 return store.getFlowEntries(networkId(), deviceId);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500139 }
140
141 @Override
142 public void applyFlowRules(FlowRule... flowRules) {
yoonseon6b972c32016-12-06 16:45:03 -0800143 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
144 for (FlowRule flowRule : flowRules) {
145 builder.add(flowRule);
146 }
147 apply(builder.build());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500148 }
149
150 @Override
151 public void purgeFlowRules(DeviceId deviceId) {
yoonseonc6a69272017-01-12 18:22:20 -0800152 store.purgeFlowRule(networkId(), deviceId);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500153 }
154
155 @Override
156 public void removeFlowRules(FlowRule... flowRules) {
yoonseon6b972c32016-12-06 16:45:03 -0800157 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
158 for (FlowRule flowRule : flowRules) {
159 builder.remove(flowRule);
160 }
161 apply(builder.build());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500162 }
163
164 @Override
yoonseon6b972c32016-12-06 16:45:03 -0800165 public void removeFlowRulesById(ApplicationId id) {
166 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
Claudine Chiu70222ad2016-11-17 22:29:20 -0500167 }
168
169 @Override
170 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
yoonseonc6a69272017-01-12 18:22:20 -0800171 DeviceService deviceService = manager.get(networkId(), DeviceService.class);
yoonseon6b972c32016-12-06 16:45:03 -0800172
173 Set<FlowRule> flowEntries = Sets.newHashSet();
174 for (Device d : deviceService.getDevices()) {
yoonseonc6a69272017-01-12 18:22:20 -0800175 for (FlowEntry flowEntry : store.getFlowEntries(networkId(), d.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800176 if (flowEntry.appId() == id.id()) {
177 flowEntries.add(flowEntry);
178 }
179 }
180 }
181 return flowEntries;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500182 }
183
184 @Override
185 public Iterable<FlowEntry> getFlowEntriesById(ApplicationId id) {
yoonseonc6a69272017-01-12 18:22:20 -0800186 DeviceService deviceService = manager.get(networkId(), DeviceService.class);
yoonseon6b972c32016-12-06 16:45:03 -0800187
188 Set<FlowEntry> flowEntries = Sets.newHashSet();
189 for (Device d : deviceService.getDevices()) {
yoonseonc6a69272017-01-12 18:22:20 -0800190 for (FlowEntry flowEntry : store.getFlowEntries(networkId(), d.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800191 if (flowEntry.appId() == id.id()) {
192 flowEntries.add(flowEntry);
193 }
194 }
195 }
196 return flowEntries;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500197 }
198
199 @Override
200 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
yoonseonc6a69272017-01-12 18:22:20 -0800201 DeviceService deviceService = manager.get(networkId(), DeviceService.class);
yoonseon6b972c32016-12-06 16:45:03 -0800202
203 Set<FlowRule> matches = Sets.newHashSet();
204 long toLookUp = ((long) appId.id() << 16) | groupId;
205 for (Device d : deviceService.getDevices()) {
yoonseonc6a69272017-01-12 18:22:20 -0800206 for (FlowEntry flowEntry : store.getFlowEntries(networkId(), d.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800207 if ((flowEntry.id().value() >>> 32) == toLookUp) {
208 matches.add(flowEntry);
209 }
210 }
211 }
212 return matches;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500213 }
214
215 @Override
216 public void apply(FlowRuleOperations ops) {
yoonseon6b972c32016-12-06 16:45:03 -0800217 operationsService.execute(new FlowOperationsProcessor(ops));
Claudine Chiu70222ad2016-11-17 22:29:20 -0500218 }
219
220 @Override
221 public Iterable<TableStatisticsEntry> getFlowTableStatistics(DeviceId deviceId) {
yoonseonc6a69272017-01-12 18:22:20 -0800222 return store.getTableStatistics(networkId(), deviceId);
yoonseon6b972c32016-12-06 16:45:03 -0800223 }
224
225 private static FlowRuleBatchEntry.FlowRuleOperation mapOperationType(FlowRuleOperation.Type input) {
226 switch (input) {
227 case ADD:
228 return FlowRuleBatchEntry.FlowRuleOperation.ADD;
229 case MODIFY:
230 return FlowRuleBatchEntry.FlowRuleOperation.MODIFY;
231 case REMOVE:
232 return FlowRuleBatchEntry.FlowRuleOperation.REMOVE;
233 default:
234 throw new UnsupportedOperationException("Unknown flow rule type " + input);
235 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500236 }
237
yoonseon6b972c32016-12-06 16:45:03 -0800238 private class FlowOperationsProcessor implements Runnable {
239 // Immutable
240 private final FlowRuleOperations fops;
241
242 // Mutable
243 private final List<Set<FlowRuleOperation>> stages;
244 private final Set<DeviceId> pendingDevices = new HashSet<>();
245 private boolean hasFailed = false;
246
247 FlowOperationsProcessor(FlowRuleOperations ops) {
248 this.stages = Lists.newArrayList(ops.stages());
249 this.fops = ops;
250 }
251
252 @Override
253 public synchronized void run() {
Jon Hallcbd1b392017-01-18 20:15:44 -0800254 if (!stages.isEmpty()) {
yoonseon6b972c32016-12-06 16:45:03 -0800255 process(stages.remove(0));
256 } else if (!hasFailed) {
257 fops.callback().onSuccess(fops);
258 }
259 }
260
261 private void process(Set<FlowRuleOperation> ops) {
262 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();
263
264 for (FlowRuleOperation op : ops) {
265 perDeviceBatches.put(op.rule().deviceId(),
266 new FlowRuleBatchEntry(mapOperationType(op.type()), op.rule()));
267 }
268 pendingDevices.addAll(perDeviceBatches.keySet());
269
270 for (DeviceId deviceId : perDeviceBatches.keySet()) {
271 long id = idGenerator.getNewId();
272 final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
273 deviceId, id);
274 pendingFlowOperations.put(id, this);
yoonseonc6a69272017-01-12 18:22:20 -0800275 deviceInstallers.execute(() -> store.storeBatch(networkId(), b));
yoonseon6b972c32016-12-06 16:45:03 -0800276 }
277 }
278
279 synchronized void satisfy(DeviceId devId) {
280 pendingDevices.remove(devId);
281 if (pendingDevices.isEmpty()) {
282 operationsService.execute(this);
283 }
284 }
285
286 synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
287 hasFailed = true;
288 pendingDevices.remove(devId);
289 if (pendingDevices.isEmpty()) {
290 operationsService.execute(this);
291 }
292
293 FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
294 failures.forEach(failedOpsBuilder::add);
295
296 fops.callback().onError(failedOpsBuilder.build());
297 }
298 }
299
yoonseonbd8a93d2016-12-07 15:51:21 -0800300 private final class InternalFlowRuleProviderService
yoonseon6b972c32016-12-06 16:45:03 -0800301 extends AbstractVirtualProviderService<VirtualFlowRuleProvider>
302 implements VirtualFlowRuleProviderService {
303
304 final Map<FlowEntry, Long> firstSeen = Maps.newConcurrentMap();
305 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
306
yoonseonbd8a93d2016-12-07 15:51:21 -0800307 private InternalFlowRuleProviderService() {
308 //TODO: find a proper virtual provider.
309 Set<ProviderId> providerIds =
310 providerRegistryService.getProvidersByService(this);
311 ProviderId providerId = providerIds.stream().findFirst().get();
312 VirtualFlowRuleProvider provider = (VirtualFlowRuleProvider)
313 providerRegistryService.getProvider(providerId);
314 setProvider(provider);
315 }
316
yoonseon6b972c32016-12-06 16:45:03 -0800317 @Override
318 public void flowRemoved(FlowEntry flowEntry) {
319 checkNotNull(flowEntry, FLOW_RULE_NULL);
320 checkValidity();
321
322 lastSeen.remove(flowEntry);
323 firstSeen.remove(flowEntry);
yoonseonc6a69272017-01-12 18:22:20 -0800324 FlowEntry stored = store.getFlowEntry(networkId(), flowEntry);
yoonseon6b972c32016-12-06 16:45:03 -0800325 if (stored == null) {
326 log.debug("Rule already evicted from store: {}", flowEntry);
327 return;
328 }
329 if (flowEntry.reason() == FlowEntry.FlowRemoveReason.HARD_TIMEOUT) {
330 ((DefaultFlowEntry) stored).setState(FlowEntry.FlowEntryState.REMOVED);
331 }
yoonseon6b972c32016-12-06 16:45:03 -0800332
333 //FIXME: obtains provider from devices providerId()
334 FlowRuleEvent event = null;
335 switch (stored.state()) {
336 case ADDED:
337 case PENDING_ADD:
yoonseonc6a69272017-01-12 18:22:20 -0800338 provider().applyFlowRule(networkId(), stored);
yoonseon6b972c32016-12-06 16:45:03 -0800339 break;
340 case PENDING_REMOVE:
341 case REMOVED:
yoonseonc6a69272017-01-12 18:22:20 -0800342 event = store.removeFlowRule(networkId(), stored);
yoonseon6b972c32016-12-06 16:45:03 -0800343 break;
344 default:
345 break;
346
347 }
348 if (event != null) {
349 log.debug("Flow {} removed", flowEntry);
350 post(event);
351 }
352 }
353
yoonseon6b972c32016-12-06 16:45:03 -0800354 private void flowMissing(FlowEntry flowRule) {
355 checkNotNull(flowRule, FLOW_RULE_NULL);
356 checkValidity();
357
358 FlowRuleEvent event = null;
359 switch (flowRule.state()) {
360 case PENDING_REMOVE:
361 case REMOVED:
yoonseonc6a69272017-01-12 18:22:20 -0800362 event = store.removeFlowRule(networkId(), flowRule);
yoonseon6b972c32016-12-06 16:45:03 -0800363 break;
364 case ADDED:
365 case PENDING_ADD:
yoonseonc6a69272017-01-12 18:22:20 -0800366 event = store.pendingFlowRule(networkId(), flowRule);
yoonseonbd8a93d2016-12-07 15:51:21 -0800367
yoonseon6b972c32016-12-06 16:45:03 -0800368 try {
yoonseonc6a69272017-01-12 18:22:20 -0800369 provider().applyFlowRule(networkId(), flowRule);
yoonseon6b972c32016-12-06 16:45:03 -0800370 } catch (UnsupportedOperationException e) {
371 log.warn(e.getMessage());
372 if (flowRule instanceof DefaultFlowEntry) {
373 //FIXME modification of "stored" flow entry outside of store
374 ((DefaultFlowEntry) flowRule).setState(FlowEntry.FlowEntryState.FAILED);
375 }
376 }
377 break;
378 default:
379 log.debug("Flow {} has not been installed.", flowRule);
380 }
381
382 if (event != null) {
383 log.debug("Flow {} removed", flowRule);
384 post(event);
385 }
386 }
387
388 private void extraneousFlow(FlowRule flowRule) {
389 checkNotNull(flowRule, FLOW_RULE_NULL);
390 checkValidity();
391
yoonseonc6a69272017-01-12 18:22:20 -0800392 provider().removeFlowRule(networkId(), flowRule);
yoonseon6b972c32016-12-06 16:45:03 -0800393 log.debug("Flow {} is on switch but not in store.", flowRule);
394 }
395
396 private void flowAdded(FlowEntry flowEntry) {
397 checkNotNull(flowEntry, FLOW_RULE_NULL);
398
yoonseonc6a69272017-01-12 18:22:20 -0800399 if (checkRuleLiveness(flowEntry, store.getFlowEntry(networkId(), flowEntry))) {
400 FlowRuleEvent event = store.addOrUpdateFlowRule(networkId(), flowEntry);
yoonseon6b972c32016-12-06 16:45:03 -0800401 if (event == null) {
402 log.debug("No flow store event generated.");
403 } else {
404 log.trace("Flow {} {}", flowEntry, event.type());
405 post(event);
406 }
407 } else {
408 log.debug("Removing flow rules....");
409 removeFlowRules(flowEntry);
410 }
411 }
412
413 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
414 if (storedRule == null) {
415 return false;
416 }
417 if (storedRule.isPermanent()) {
418 return true;
419 }
420
Ray Milkey3717e602018-02-01 13:49:47 -0800421 final long timeout = storedRule.timeout() * 1000L;
yoonseon6b972c32016-12-06 16:45:03 -0800422 final long currentTime = System.currentTimeMillis();
423
424 // Checking flow with hardTimeout
425 if (storedRule.hardTimeout() != 0) {
426 if (!firstSeen.containsKey(storedRule)) {
427 // First time rule adding
428 firstSeen.put(storedRule, currentTime);
429 } else {
430 Long first = firstSeen.get(storedRule);
Ray Milkey3717e602018-02-01 13:49:47 -0800431 final long hardTimeout = storedRule.hardTimeout() * 1000L;
yoonseon6b972c32016-12-06 16:45:03 -0800432 if ((currentTime - first) > hardTimeout) {
433 return false;
434 }
435 }
436 }
437
438 if (storedRule.packets() != swRule.packets()) {
439 lastSeen.put(storedRule, currentTime);
440 return true;
441 }
442 if (!lastSeen.containsKey(storedRule)) {
443 // checking for the first time
444 lastSeen.put(storedRule, storedRule.lastSeen());
445 // Use following if lastSeen attr. was removed.
446 //lastSeen.put(storedRule, currentTime);
447 }
448 Long last = lastSeen.get(storedRule);
449
450 // concurrently removed? let the liveness check fail
451 return last != null && (currentTime - last) <= timeout;
452 }
453
454 @Override
455 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
456 pushFlowMetricsInternal(deviceId, flowEntries, true);
457 }
458
459 @Override
460 public void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
461 pushFlowMetricsInternal(deviceId, flowEntries, false);
462 }
463
464 private void pushFlowMetricsInternal(DeviceId deviceId, Iterable<FlowEntry> flowEntries,
465 boolean useMissingFlow) {
466 Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
yoonseonc6a69272017-01-12 18:22:20 -0800467 store.getFlowEntries(networkId(), deviceId).forEach(f -> storedRules.put(f, f));
yoonseon6b972c32016-12-06 16:45:03 -0800468
469 for (FlowEntry rule : flowEntries) {
470 try {
471 FlowEntry storedRule = storedRules.remove(rule);
472 if (storedRule != null) {
Yoonseon Hanc8089db2017-03-22 20:22:12 +0900473 if (storedRule.id().equals(rule.id())) {
yoonseon6b972c32016-12-06 16:45:03 -0800474 // we both have the rule, let's update some info then.
475 flowAdded(rule);
476 } else {
477 // the two rules are not an exact match - remove the
478 // switch's rule and install our rule
479 extraneousFlow(rule);
480 flowMissing(storedRule);
481 }
482 }
483 } catch (Exception e) {
484 log.debug("Can't process added or extra rule {}", e.getMessage());
485 }
486 }
487
488 // DO NOT reinstall
489 if (useMissingFlow) {
490 for (FlowEntry rule : storedRules.keySet()) {
491 try {
492 // there are rules in the store that aren't on the switch
493 log.debug("Adding rule in store, but not on switch {}", rule);
494 flowMissing(rule);
495 } catch (Exception e) {
496 log.debug("Can't add missing flow rule:", e);
497 }
498 }
499 }
500 }
501
502 public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
yoonseonc6a69272017-01-12 18:22:20 -0800503 store.batchOperationComplete(networkId(), FlowRuleBatchEvent.completed(
yoonseon6b972c32016-12-06 16:45:03 -0800504 new FlowRuleBatchRequest(batchId, Collections.emptySet()),
505 operation
506 ));
507 }
508
509 @Override
510 public void pushTableStatistics(DeviceId deviceId,
511 List<TableStatisticsEntry> tableStats) {
yoonseonc6a69272017-01-12 18:22:20 -0800512 store.updateTableStatistics(networkId(), deviceId, tableStats);
yoonseon6b972c32016-12-06 16:45:03 -0800513 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500514 }
yoonseonbd8a93d2016-12-07 15:51:21 -0800515
516 // Store delegate to re-post events emitted from the store.
517 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
518
519 // TODO: Right now we only dispatch events at individual flowEntry level.
520 // It may be more efficient for also dispatch events as a batch.
521 @Override
522 public void notify(FlowRuleBatchEvent event) {
523 final FlowRuleBatchRequest request = event.subject();
524 switch (event.type()) {
525 case BATCH_OPERATION_REQUESTED:
526 // Request has been forwarded to MASTER Node, and was
527 request.ops().forEach(
528 op -> {
529 switch (op.operator()) {
530 case ADD:
531 post(new FlowRuleEvent(RULE_ADD_REQUESTED, op.target()));
532 break;
533 case REMOVE:
534 post(new FlowRuleEvent(RULE_REMOVE_REQUESTED, op.target()));
535 break;
536 case MODIFY:
537 //TODO: do something here when the time comes.
538 break;
539 default:
540 log.warn("Unknown flow operation operator: {}", op.operator());
541 }
542 }
543 );
544
545 DeviceId deviceId = event.deviceId();
546 FlowRuleBatchOperation batchOperation = request.asBatchOperation(deviceId);
547
548 VirtualFlowRuleProvider provider = innerProviderService.provider();
549 if (provider != null) {
550 provider.executeBatch(networkId, batchOperation);
551 }
552
553 break;
554
555 case BATCH_OPERATION_COMPLETED:
Yoonseon Hanc8089db2017-03-22 20:22:12 +0900556 FlowOperationsProcessor fops = pendingFlowOperations.remove(
557 event.subject().batchId());
558 if (fops == null) {
559 return;
560 }
561
562 if (event.result().isSuccess()) {
563 fops.satisfy(event.deviceId());
564 } else {
565 fops.fail(event.deviceId(), event.result().failedItems());
566 }
yoonseonbd8a93d2016-12-07 15:51:21 -0800567 break;
568
569 default:
570 break;
571 }
572 }
573 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500574}