blob: b7dd1cb489f22a3552e48a6bea2f9c298c5a2ee6 [file] [log] [blame]
Claudine Chiu70222ad2016-11-17 22:29:20 -05001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
yoonseon6b972c32016-12-06 16:45:03 -080019import com.google.common.collect.ArrayListMultimap;
20import com.google.common.collect.Iterables;
21import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Multimap;
24import com.google.common.collect.Sets;
25import org.onlab.osgi.ServiceDirectory;
Claudine Chiu70222ad2016-11-17 22:29:20 -050026import org.onosproject.core.ApplicationId;
yoonseon6b972c32016-12-06 16:45:03 -080027import org.onosproject.core.CoreService;
28import org.onosproject.core.IdGenerator;
Claudine Chiu70222ad2016-11-17 22:29:20 -050029import org.onosproject.event.AbstractListenerManager;
30import org.onosproject.incubator.net.virtual.VirtualNetwork;
yoonseon6b972c32016-12-06 16:45:03 -080031import org.onosproject.incubator.net.virtual.VirtualNetworkAdminService;
32import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
Claudine Chiu70222ad2016-11-17 22:29:20 -050033import org.onosproject.incubator.net.virtual.VnetService;
yoonseon6b972c32016-12-06 16:45:03 -080034import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProviderService;
35import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
36import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProviderService;
37import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
38import org.onosproject.net.Device;
Claudine Chiu70222ad2016-11-17 22:29:20 -050039import org.onosproject.net.DeviceId;
yoonseon6b972c32016-12-06 16:45:03 -080040import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.flow.CompletedBatchOperation;
42import org.onosproject.net.flow.DefaultFlowEntry;
Claudine Chiu70222ad2016-11-17 22:29:20 -050043import org.onosproject.net.flow.FlowEntry;
44import org.onosproject.net.flow.FlowRule;
yoonseon6b972c32016-12-06 16:45:03 -080045import org.onosproject.net.flow.FlowRuleBatchEntry;
46import org.onosproject.net.flow.FlowRuleBatchEvent;
47import org.onosproject.net.flow.FlowRuleBatchOperation;
48import org.onosproject.net.flow.FlowRuleBatchRequest;
Claudine Chiu70222ad2016-11-17 22:29:20 -050049import org.onosproject.net.flow.FlowRuleEvent;
50import org.onosproject.net.flow.FlowRuleListener;
yoonseon6b972c32016-12-06 16:45:03 -080051import org.onosproject.net.flow.FlowRuleOperation;
Claudine Chiu70222ad2016-11-17 22:29:20 -050052import org.onosproject.net.flow.FlowRuleOperations;
53import org.onosproject.net.flow.FlowRuleService;
54import org.onosproject.net.flow.TableStatisticsEntry;
yoonseon6b972c32016-12-06 16:45:03 -080055import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.Collections;
59import java.util.HashSet;
60import java.util.List;
61import java.util.Map;
62import java.util.Set;
63import java.util.concurrent.ConcurrentHashMap;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
Claudine Chiu70222ad2016-11-17 22:29:20 -050066
67import static com.google.common.base.Preconditions.checkNotNull;
yoonseon6b972c32016-12-06 16:45:03 -080068import static org.onlab.util.Tools.groupedThreads;
Claudine Chiu70222ad2016-11-17 22:29:20 -050069
70/**
71 * Flow rule service implementation built on the virtual network service.
72 */
yoonseon6b972c32016-12-06 16:45:03 -080073public class VirtualNetworkFlowRuleManager
74 extends AbstractListenerManager<FlowRuleEvent, FlowRuleListener>
Claudine Chiu70222ad2016-11-17 22:29:20 -050075 implements FlowRuleService, VnetService {
76
77 private static final String NETWORK_NULL = "Network ID cannot be null";
yoonseon6b972c32016-12-06 16:45:03 -080078 private static final String VIRTUAL_FLOW_OP_TOPIC = "virtual-flow-ops-ids";
79 private static final String THREAD_GROUP_NAME = "onos/virtual-flowservice";
80 private static final String DEVICE_INSTALLER_PATTERN = "device-installer-%d";
81 private static final String OPERATION_PATTERN = "operations-%d";
82 public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
83
84 private final Logger log = LoggerFactory.getLogger(getClass());
85
Claudine Chiu70222ad2016-11-17 22:29:20 -050086 private final VirtualNetwork network;
yoonseon6b972c32016-12-06 16:45:03 -080087 private final VirtualNetworkAdminService manager;
88 private final VirtualNetworkFlowRuleStore store;
89 private final DeviceService deviceService;
90
91 protected ExecutorService deviceInstallers =
92 Executors.newFixedThreadPool(32,
93 groupedThreads(THREAD_GROUP_NAME,
94 DEVICE_INSTALLER_PATTERN, log));
95 protected ExecutorService operationsService =
96 Executors.newFixedThreadPool(32,
97 groupedThreads(THREAD_GROUP_NAME,
98 OPERATION_PATTERN, log));
99 private IdGenerator idGenerator;
100
101 private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
102
103 private VirtualProviderRegistryService providerRegistryService = null;
104 private InternalFlowRuleProviderService innerProviderService = null;
105
106
Claudine Chiu70222ad2016-11-17 22:29:20 -0500107
108 /**
109 * Creates a new VirtualNetworkFlowRuleService object.
110 *
111 * @param virtualNetworkManager virtual network manager service
112 * @param network virtual network
yoonseon6b972c32016-12-06 16:45:03 -0800113 * @param serviceDirectory service directory
Claudine Chiu70222ad2016-11-17 22:29:20 -0500114 */
yoonseon6b972c32016-12-06 16:45:03 -0800115 public VirtualNetworkFlowRuleManager(VirtualNetworkAdminService virtualNetworkManager,
116 VirtualNetwork network,
117 ServiceDirectory serviceDirectory) {
Claudine Chiu70222ad2016-11-17 22:29:20 -0500118 checkNotNull(network, NETWORK_NULL);
119 this.network = network;
yoonseon6b972c32016-12-06 16:45:03 -0800120
121 manager = virtualNetworkManager;
122 store = serviceDirectory.get(VirtualNetworkFlowRuleStore.class);
123 idGenerator = serviceDirectory.get(CoreService.class)
124 .getIdGenerator(VIRTUAL_FLOW_OP_TOPIC + network.id().toString());
125
126 providerRegistryService =
127 serviceDirectory.get(VirtualProviderRegistryService.class);
128 innerProviderService = new InternalFlowRuleProviderService();
129 providerRegistryService.registerProviderService(network.id(), innerProviderService);
130
131 this.deviceService = manager.get(network.id(), DeviceService.class);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500132 }
133
134 @Override
135 public int getFlowRuleCount() {
yoonseon6b972c32016-12-06 16:45:03 -0800136 return store.getFlowRuleCount(network.id());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500137 }
138
139 @Override
140 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
yoonseon6b972c32016-12-06 16:45:03 -0800141 return store.getFlowEntries(network.id(), deviceId);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500142 }
143
144 @Override
145 public void applyFlowRules(FlowRule... flowRules) {
yoonseon6b972c32016-12-06 16:45:03 -0800146 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
147 for (FlowRule flowRule : flowRules) {
148 builder.add(flowRule);
149 }
150 apply(builder.build());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500151 }
152
153 @Override
154 public void purgeFlowRules(DeviceId deviceId) {
yoonseon6b972c32016-12-06 16:45:03 -0800155 store.purgeFlowRule(network.id(), deviceId);
Claudine Chiu70222ad2016-11-17 22:29:20 -0500156 }
157
158 @Override
159 public void removeFlowRules(FlowRule... flowRules) {
yoonseon6b972c32016-12-06 16:45:03 -0800160 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
161 for (FlowRule flowRule : flowRules) {
162 builder.remove(flowRule);
163 }
164 apply(builder.build());
Claudine Chiu70222ad2016-11-17 22:29:20 -0500165 }
166
167 @Override
yoonseon6b972c32016-12-06 16:45:03 -0800168 public void removeFlowRulesById(ApplicationId id) {
169 removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
Claudine Chiu70222ad2016-11-17 22:29:20 -0500170 }
171
172 @Override
173 public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
yoonseon6b972c32016-12-06 16:45:03 -0800174 DeviceService deviceService = manager.get(network.id(), DeviceService.class);
175
176 Set<FlowRule> flowEntries = Sets.newHashSet();
177 for (Device d : deviceService.getDevices()) {
178 for (FlowEntry flowEntry : store.getFlowEntries(network.id(), d.id())) {
179 if (flowEntry.appId() == id.id()) {
180 flowEntries.add(flowEntry);
181 }
182 }
183 }
184 return flowEntries;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500185 }
186
187 @Override
188 public Iterable<FlowEntry> getFlowEntriesById(ApplicationId id) {
yoonseon6b972c32016-12-06 16:45:03 -0800189 DeviceService deviceService = manager.get(network.id(), DeviceService.class);
190
191 Set<FlowEntry> flowEntries = Sets.newHashSet();
192 for (Device d : deviceService.getDevices()) {
193 for (FlowEntry flowEntry : store.getFlowEntries(network.id(), d.id())) {
194 if (flowEntry.appId() == id.id()) {
195 flowEntries.add(flowEntry);
196 }
197 }
198 }
199 return flowEntries;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500200 }
201
202 @Override
203 public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
yoonseon6b972c32016-12-06 16:45:03 -0800204 DeviceService deviceService = manager.get(network.id(), DeviceService.class);
205
206 Set<FlowRule> matches = Sets.newHashSet();
207 long toLookUp = ((long) appId.id() << 16) | groupId;
208 for (Device d : deviceService.getDevices()) {
209 for (FlowEntry flowEntry : store.getFlowEntries(network.id(), d.id())) {
210 if ((flowEntry.id().value() >>> 32) == toLookUp) {
211 matches.add(flowEntry);
212 }
213 }
214 }
215 return matches;
Claudine Chiu70222ad2016-11-17 22:29:20 -0500216 }
217
218 @Override
219 public void apply(FlowRuleOperations ops) {
yoonseon6b972c32016-12-06 16:45:03 -0800220 operationsService.execute(new FlowOperationsProcessor(ops));
Claudine Chiu70222ad2016-11-17 22:29:20 -0500221 }
222
223 @Override
224 public Iterable<TableStatisticsEntry> getFlowTableStatistics(DeviceId deviceId) {
yoonseon6b972c32016-12-06 16:45:03 -0800225 return store.getTableStatistics(network.id(), deviceId);
226 }
227
228 private static FlowRuleBatchEntry.FlowRuleOperation mapOperationType(FlowRuleOperation.Type input) {
229 switch (input) {
230 case ADD:
231 return FlowRuleBatchEntry.FlowRuleOperation.ADD;
232 case MODIFY:
233 return FlowRuleBatchEntry.FlowRuleOperation.MODIFY;
234 case REMOVE:
235 return FlowRuleBatchEntry.FlowRuleOperation.REMOVE;
236 default:
237 throw new UnsupportedOperationException("Unknown flow rule type " + input);
238 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500239 }
240
241 @Override
242 public VirtualNetwork network() {
yoonseon6b972c32016-12-06 16:45:03 -0800243 return this.network;
244 }
245
246 private class FlowOperationsProcessor implements Runnable {
247 // Immutable
248 private final FlowRuleOperations fops;
249
250 // Mutable
251 private final List<Set<FlowRuleOperation>> stages;
252 private final Set<DeviceId> pendingDevices = new HashSet<>();
253 private boolean hasFailed = false;
254
255 FlowOperationsProcessor(FlowRuleOperations ops) {
256 this.stages = Lists.newArrayList(ops.stages());
257 this.fops = ops;
258 }
259
260 @Override
261 public synchronized void run() {
262 if (stages.size() > 0) {
263 process(stages.remove(0));
264 } else if (!hasFailed) {
265 fops.callback().onSuccess(fops);
266 }
267 }
268
269 private void process(Set<FlowRuleOperation> ops) {
270 Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();
271
272 for (FlowRuleOperation op : ops) {
273 perDeviceBatches.put(op.rule().deviceId(),
274 new FlowRuleBatchEntry(mapOperationType(op.type()), op.rule()));
275 }
276 pendingDevices.addAll(perDeviceBatches.keySet());
277
278 for (DeviceId deviceId : perDeviceBatches.keySet()) {
279 long id = idGenerator.getNewId();
280 final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
281 deviceId, id);
282 pendingFlowOperations.put(id, this);
283 deviceInstallers.execute(() -> store.storeBatch(network.id(), b));
284 }
285 }
286
287 synchronized void satisfy(DeviceId devId) {
288 pendingDevices.remove(devId);
289 if (pendingDevices.isEmpty()) {
290 operationsService.execute(this);
291 }
292 }
293
294 synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
295 hasFailed = true;
296 pendingDevices.remove(devId);
297 if (pendingDevices.isEmpty()) {
298 operationsService.execute(this);
299 }
300
301 FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
302 failures.forEach(failedOpsBuilder::add);
303
304 fops.callback().onError(failedOpsBuilder.build());
305 }
306 }
307
308 private class InternalFlowRuleProviderService
309 extends AbstractVirtualProviderService<VirtualFlowRuleProvider>
310 implements VirtualFlowRuleProviderService {
311
312 final Map<FlowEntry, Long> firstSeen = Maps.newConcurrentMap();
313 final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
314
315 @Override
316 public void flowRemoved(FlowEntry flowEntry) {
317 checkNotNull(flowEntry, FLOW_RULE_NULL);
318 checkValidity();
319
320 lastSeen.remove(flowEntry);
321 firstSeen.remove(flowEntry);
322 FlowEntry stored = store.getFlowEntry(network.id(), flowEntry);
323 if (stored == null) {
324 log.debug("Rule already evicted from store: {}", flowEntry);
325 return;
326 }
327 if (flowEntry.reason() == FlowEntry.FlowRemoveReason.HARD_TIMEOUT) {
328 ((DefaultFlowEntry) stored).setState(FlowEntry.FlowEntryState.REMOVED);
329 }
330 Device device = deviceService.getDevice(flowEntry.deviceId());
331
332 //FIXME: obtains provider from devices providerId()
333 FlowRuleEvent event = null;
334 switch (stored.state()) {
335 case ADDED:
336 case PENDING_ADD:
337 provider().applyFlowRule(network.id(), stored);
338 break;
339 case PENDING_REMOVE:
340 case REMOVED:
341 event = store.removeFlowRule(network.id(), stored);
342 break;
343 default:
344 break;
345
346 }
347 if (event != null) {
348 log.debug("Flow {} removed", flowEntry);
349 post(event);
350 }
351 }
352
353
354 private void flowMissing(FlowEntry flowRule) {
355 checkNotNull(flowRule, FLOW_RULE_NULL);
356 checkValidity();
357
358 FlowRuleEvent event = null;
359 switch (flowRule.state()) {
360 case PENDING_REMOVE:
361 case REMOVED:
362 event = store.removeFlowRule(network.id(), flowRule);
363 break;
364 case ADDED:
365 case PENDING_ADD:
366 event = store.pendingFlowRule(network.id(), flowRule);
367 try {
368 provider().applyFlowRule(network.id(), flowRule);
369 } catch (UnsupportedOperationException e) {
370 log.warn(e.getMessage());
371 if (flowRule instanceof DefaultFlowEntry) {
372 //FIXME modification of "stored" flow entry outside of store
373 ((DefaultFlowEntry) flowRule).setState(FlowEntry.FlowEntryState.FAILED);
374 }
375 }
376 break;
377 default:
378 log.debug("Flow {} has not been installed.", flowRule);
379 }
380
381 if (event != null) {
382 log.debug("Flow {} removed", flowRule);
383 post(event);
384 }
385 }
386
387 private void extraneousFlow(FlowRule flowRule) {
388 checkNotNull(flowRule, FLOW_RULE_NULL);
389 checkValidity();
390
391 provider().removeFlowRule(network.id(), flowRule);
392 log.debug("Flow {} is on switch but not in store.", flowRule);
393 }
394
395 private void flowAdded(FlowEntry flowEntry) {
396 checkNotNull(flowEntry, FLOW_RULE_NULL);
397
398 if (checkRuleLiveness(flowEntry, store.getFlowEntry(network.id(), flowEntry))) {
399 FlowRuleEvent event = store.addOrUpdateFlowRule(network.id(), flowEntry);
400 if (event == null) {
401 log.debug("No flow store event generated.");
402 } else {
403 log.trace("Flow {} {}", flowEntry, event.type());
404 post(event);
405 }
406 } else {
407 log.debug("Removing flow rules....");
408 removeFlowRules(flowEntry);
409 }
410 }
411
412 private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
413 if (storedRule == null) {
414 return false;
415 }
416 if (storedRule.isPermanent()) {
417 return true;
418 }
419
420 final long timeout = storedRule.timeout() * 1000;
421 final long currentTime = System.currentTimeMillis();
422
423 // Checking flow with hardTimeout
424 if (storedRule.hardTimeout() != 0) {
425 if (!firstSeen.containsKey(storedRule)) {
426 // First time rule adding
427 firstSeen.put(storedRule, currentTime);
428 } else {
429 Long first = firstSeen.get(storedRule);
430 final long hardTimeout = storedRule.hardTimeout() * 1000;
431 if ((currentTime - first) > hardTimeout) {
432 return false;
433 }
434 }
435 }
436
437 if (storedRule.packets() != swRule.packets()) {
438 lastSeen.put(storedRule, currentTime);
439 return true;
440 }
441 if (!lastSeen.containsKey(storedRule)) {
442 // checking for the first time
443 lastSeen.put(storedRule, storedRule.lastSeen());
444 // Use following if lastSeen attr. was removed.
445 //lastSeen.put(storedRule, currentTime);
446 }
447 Long last = lastSeen.get(storedRule);
448
449 // concurrently removed? let the liveness check fail
450 return last != null && (currentTime - last) <= timeout;
451 }
452
453 @Override
454 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
455 pushFlowMetricsInternal(deviceId, flowEntries, true);
456 }
457
458 @Override
459 public void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
460 pushFlowMetricsInternal(deviceId, flowEntries, false);
461 }
462
463 private void pushFlowMetricsInternal(DeviceId deviceId, Iterable<FlowEntry> flowEntries,
464 boolean useMissingFlow) {
465 Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
466 store.getFlowEntries(network.id(), deviceId).forEach(f -> storedRules.put(f, f));
467
468 for (FlowEntry rule : flowEntries) {
469 try {
470 FlowEntry storedRule = storedRules.remove(rule);
471 if (storedRule != null) {
472 if (storedRule.exactMatch(rule)) {
473 // we both have the rule, let's update some info then.
474 flowAdded(rule);
475 } else {
476 // the two rules are not an exact match - remove the
477 // switch's rule and install our rule
478 extraneousFlow(rule);
479 flowMissing(storedRule);
480 }
481 }
482 } catch (Exception e) {
483 log.debug("Can't process added or extra rule {}", e.getMessage());
484 }
485 }
486
487 // DO NOT reinstall
488 if (useMissingFlow) {
489 for (FlowEntry rule : storedRules.keySet()) {
490 try {
491 // there are rules in the store that aren't on the switch
492 log.debug("Adding rule in store, but not on switch {}", rule);
493 flowMissing(rule);
494 } catch (Exception e) {
495 log.debug("Can't add missing flow rule:", e);
496 }
497 }
498 }
499 }
500
501 public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
502 store.batchOperationComplete(network.id(), FlowRuleBatchEvent.completed(
503 new FlowRuleBatchRequest(batchId, Collections.emptySet()),
504 operation
505 ));
506 }
507
508 @Override
509 public void pushTableStatistics(DeviceId deviceId,
510 List<TableStatisticsEntry> tableStats) {
511 store.updateTableStatistics(network.id(), deviceId, tableStats);
512 }
Claudine Chiu70222ad2016-11-17 22:29:20 -0500513 }
514}