blob: 56ee52ba45e66728aaf75028013b6a56fcbebe6e [file] [log] [blame]
Carmelo Casconefa421582018-09-13 10:05:57 -07001/*
2 * Copyright 2015-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.inbandtelemetry.impl;
17
18import com.google.common.collect.Maps;
19import com.google.common.util.concurrent.Striped;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onlab.util.SharedScheduledExecutors;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.inbandtelemetry.api.IntConfig;
31import org.onosproject.inbandtelemetry.api.IntIntent;
32import org.onosproject.inbandtelemetry.api.IntIntentId;
33import org.onosproject.inbandtelemetry.api.IntObjective;
34import org.onosproject.inbandtelemetry.api.IntProgrammable;
35import org.onosproject.inbandtelemetry.api.IntService;
36import org.onosproject.mastership.MastershipService;
37import org.onosproject.net.ConnectPoint;
38import org.onosproject.net.Device;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.MastershipRole;
41import org.onosproject.net.PortNumber;
42import org.onosproject.net.device.DeviceEvent;
43import org.onosproject.net.device.DeviceListener;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.host.HostEvent;
46import org.onosproject.net.host.HostListener;
47import org.onosproject.net.host.HostService;
48import org.onosproject.store.serializers.KryoNamespaces;
49import org.onosproject.store.service.AtomicIdGenerator;
50import org.onosproject.store.service.AtomicValue;
51import org.onosproject.store.service.AtomicValueEvent;
52import org.onosproject.store.service.AtomicValueEventListener;
53import org.onosproject.store.service.ConsistentMap;
54import org.onosproject.store.service.MapEvent;
55import org.onosproject.store.service.MapEventListener;
56import org.onosproject.store.service.Serializer;
57import org.onosproject.store.service.StorageService;
58import org.onosproject.store.service.Versioned;
59import org.slf4j.Logger;
60
61import java.util.Collection;
62import java.util.Map;
63import java.util.Optional;
64import java.util.Set;
65import java.util.concurrent.ConcurrentMap;
66import java.util.concurrent.ExecutionException;
67import java.util.concurrent.ScheduledFuture;
68import java.util.concurrent.TimeUnit;
69import java.util.concurrent.TimeoutException;
70import java.util.concurrent.locks.Lock;
71import java.util.stream.Collectors;
72
73import static com.google.common.base.Preconditions.checkNotNull;
74import static org.slf4j.LoggerFactory.getLogger;
75
76/**
77 * Simple implementation of IntService, for controlling INT-capable pipelines.
78 * <p>
79 * All INT intents are converted to an equivalent INT objective and applied to
80 * all SOURCE_SINK devices. A device is deemed SOURCE_SINK if it has at least
81 * one host attached.
82 * <p>
83 * The implementation listens for different types of events and when required it
84 * configures a device by cleaning-up any previous state and applying the new
85 * one.
86 */
87@Component(immediate = true)
88@Service
89public class SimpleIntManager implements IntService {
90
91 private final Logger log = getLogger(getClass());
92
93 private static final int CONFIG_EVENT_DELAY = 5; // Seconds.
94
95 private static final String APP_NAME = "org.onosproject.inbandtelemetry";
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 private CoreService coreService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 private DeviceService deviceService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 private StorageService storageService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 private MastershipService mastershipService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 private HostService hostService;
111
112 private final Striped<Lock> deviceLocks = Striped.lock(10);
113
114 private final ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledDeviceTasks = Maps.newConcurrentMap();
115
116 // Distributed state.
117 private ConsistentMap<IntIntentId, IntIntent> intentMap;
118 private ConsistentMap<DeviceId, Long> devicesToConfigure;
119 private AtomicValue<IntConfig> intConfig;
120 private AtomicValue<Boolean> intStarted;
121 private AtomicIdGenerator intentIds;
122
123 // Event listeners.
124 private final InternalHostListener hostListener = new InternalHostListener();
125 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
126 private final InternalIntentMapListener intentMapListener = new InternalIntentMapListener();
127 private final InternalIntConfigListener intConfigListener = new InternalIntConfigListener();
128 private final InternalIntStartedListener intStartedListener = new InternalIntStartedListener();
129 private final InternalDeviceToConfigureListener devicesToConfigureListener =
130 new InternalDeviceToConfigureListener();
131
132 @Activate
133 public void activate() {
134
135 final ApplicationId appId = coreService.registerApplication(APP_NAME);
136
137 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
138 .register(KryoNamespaces.API)
139 .register(IntIntent.class)
140 .register(IntIntentId.class)
141 .register(IntDeviceRole.class)
142 .register(IntIntent.IntHeaderType.class)
143 .register(IntIntent.IntMetadataType.class)
144 .register(IntIntent.IntReportType.class)
145 .register(IntIntent.TelemetryMode.class)
146 .register(IntConfig.class)
147 .register(IntConfig.TelemetrySpec.class);
148
149 devicesToConfigure = storageService.<DeviceId, Long>consistentMapBuilder()
150 .withSerializer(Serializer.using(serializer.build()))
151 .withName("onos-int-devices-to-configure")
152 .withApplicationId(appId)
153 .withPurgeOnUninstall()
154 .build();
155 devicesToConfigure.addListener(devicesToConfigureListener);
156
157 intentMap = storageService.<IntIntentId, IntIntent>consistentMapBuilder()
158 .withSerializer(Serializer.using(serializer.build()))
159 .withName("onos-int-intents")
160 .withApplicationId(appId)
161 .withPurgeOnUninstall()
162 .build();
163 intentMap.addListener(intentMapListener);
164
165 intStarted = storageService.<Boolean>atomicValueBuilder()
166 .withSerializer(Serializer.using(serializer.build()))
167 .withName("onos-int-started")
168 .withApplicationId(appId)
169 .build()
170 .asAtomicValue();
171 intStarted.addListener(intStartedListener);
172
173 intConfig = storageService.<IntConfig>atomicValueBuilder()
174 .withSerializer(Serializer.using(serializer.build()))
175 .withName("onos-int-config")
176 .withApplicationId(appId)
177 .build()
178 .asAtomicValue();
179 intConfig.addListener(intConfigListener);
180
181 intentIds = storageService.getAtomicIdGenerator("int-intent-id-generator");
182
183 // Bootstrap config for already existing devices.
184 triggerAllDeviceConfigure();
185
186 hostService.addListener(hostListener);
187 deviceService.addListener(deviceListener);
188
189 startInt();
190 log.info("Started", appId.id());
191 }
192
193 @Deactivate
194 public void deactivate() {
195 deviceService.removeListener(deviceListener);
196 hostService.removeListener(hostListener);
197 intentIds = null;
198 intConfig.removeListener(intConfigListener);
199 intConfig = null;
200 intStarted.removeListener(intStartedListener);
201 intStarted = null;
202 intentMap.removeListener(intentMapListener);
203 intentMap = null;
204 devicesToConfigure.removeListener(devicesToConfigureListener);
205 devicesToConfigure.destroy();
206 devicesToConfigure = null;
207 // Cancel tasks (if any).
208 scheduledDeviceTasks.values().forEach(f -> {
209 f.cancel(true);
210 if (!f.isDone()) {
211 try {
212 f.get(1, TimeUnit.SECONDS);
213 } catch (InterruptedException | ExecutionException | TimeoutException e) {
214 // Don't care, we are terminating the service anyways.
215 }
216 }
217 });
218 // Clean up INT rules from existing devices.
219 deviceService.getDevices().forEach(d -> cleanupDevice(d.id()));
220 log.info("Deactivated");
221 }
222
223 @Override
224 public void startInt() {
225 // Atomic value event will trigger device configure.
226 intStarted.set(true);
227 }
228
229 @Override
230 public void startInt(Set<DeviceId> deviceIds) {
231 log.warn("Starting INT for a subset of devices is not supported");
232 }
233
234 @Override
235 public void stopInt() {
236 // Atomic value event will trigger device configure.
237 intStarted.set(false);
238 }
239
240 @Override
241 public void stopInt(Set<DeviceId> deviceIds) {
242 log.warn("Stopping INT for a subset of devices is not supported");
243 }
244
245 @Override
246 public void setConfig(IntConfig cfg) {
247 checkNotNull(cfg);
248 // Atomic value event will trigger device configure.
249 intConfig.set(cfg);
250 }
251
252 @Override
253 public IntConfig getConfig() {
254 return intConfig.get();
255 }
256
257 @Override
258 public IntIntentId installIntIntent(IntIntent intent) {
259 checkNotNull(intent);
260 final Integer intentId = (int) intentIds.nextId();
261 final IntIntentId intIntentId = IntIntentId.valueOf(intentId);
262 // Intent map event will trigger device configure.
263 intentMap.put(intIntentId, intent);
264 return intIntentId;
265 }
266
267 @Override
268 public void removeIntIntent(IntIntentId intentId) {
269 checkNotNull(intentId);
270 // Intent map event will trigger device configure.
271 intentMap.remove(intentId).value();
272 }
273
274 @Override
275 public IntIntent getIntIntent(IntIntentId intentId) {
276 return Optional.ofNullable(intentMap.get(intentId).value()).orElse(null);
277 }
278
279 @Override
280 public Map<IntIntentId, IntIntent> getIntIntents() {
281 return intentMap.asJavaMap();
282 }
283
284 private boolean isConfigTaskValid(DeviceId deviceId, long creationTime) {
285 Versioned<?> versioned = devicesToConfigure.get(deviceId);
286 return versioned != null && versioned.creationTime() == creationTime;
287 }
288
289 private boolean isIntStarted() {
290 return intStarted.get();
291 }
292
293 private boolean isNotIntConfigured() {
294 return intConfig.get() == null;
295 }
296
297 private boolean isIntProgrammable(DeviceId deviceId) {
298 final Device device = deviceService.getDevice(deviceId);
299 return device != null && device.is(IntProgrammable.class);
300 }
301
302 private void triggerDeviceConfigure(DeviceId deviceId) {
303 if (isIntProgrammable(deviceId)) {
304 devicesToConfigure.put(deviceId, System.nanoTime());
305 }
306 }
307
308 private void triggerAllDeviceConfigure() {
309 deviceService.getDevices().forEach(d -> triggerDeviceConfigure(d.id()));
310 }
311
312 private void configDeviceTask(DeviceId deviceId, long creationTime) {
313 if (isConfigTaskValid(deviceId, creationTime)) {
314 // Task outdated.
315 return;
316 }
317 if (!deviceService.isAvailable(deviceId)) {
318 return;
319 }
320 final MastershipRole role = mastershipService.requestRoleForSync(deviceId);
321 if (!role.equals(MastershipRole.MASTER)) {
322 return;
323 }
324 deviceLocks.get(deviceId).lock();
325 try {
326 // Clean up first.
327 cleanupDevice(deviceId);
328 if (!configDevice(deviceId)) {
329 // Clean up if fails.
330 cleanupDevice(deviceId);
331 return;
332 }
333 devicesToConfigure.remove(deviceId);
334 } finally {
335 deviceLocks.get(deviceId).unlock();
336 }
337 }
338
339 private void cleanupDevice(DeviceId deviceId) {
340 final Device device = deviceService.getDevice(deviceId);
341 if (device == null || !device.is(IntProgrammable.class)) {
342 return;
343 }
344 device.as(IntProgrammable.class).cleanup();
345 }
346
347 private boolean configDevice(DeviceId deviceId) {
348 // Returns true if config was successful, false if not and a clean up is
349 // needed.
350 final Device device = deviceService.getDevice(deviceId);
351 if (device == null || !device.is(IntProgrammable.class)) {
352 return true;
353 }
354
355 if (isNotIntConfigured()) {
356 log.warn("Missing INT config, aborting programming of INT device {}", deviceId);
357 return true;
358 }
359
360 final boolean isEdge = !hostService.getConnectedHosts(deviceId).isEmpty();
361 final IntDeviceRole intDeviceRole = isEdge
362 ? IntDeviceRole.SOURCE_SINK
363 : IntDeviceRole.TRANSIT;
364
365 log.info("Started programming of INT device {} with role {}...",
366 deviceId, intDeviceRole);
367
368 final IntProgrammable intProg = device.as(IntProgrammable.class);
369
370 if (!isIntStarted()) {
371 // Leave device with no INT configuration.
372 return true;
373 }
374
375 if (!intProg.init()) {
376 log.warn("Unable to init INT pipeline on {}", deviceId);
377 return false;
378 }
379
380 if (intDeviceRole != IntDeviceRole.SOURCE_SINK) {
381 // Stop here, no more configuration needed for transit devices.
382 return true;
383 }
384
385 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SINK)) {
386 if (!intProg.setupIntConfig(intConfig.get())) {
387 log.warn("Unable to apply INT report config on {}", deviceId);
388 return false;
389 }
390 }
391
392 // Port configuration.
393 final Set<PortNumber> hostPorts = deviceService.getPorts(deviceId)
394 .stream()
395 .map(port -> new ConnectPoint(deviceId, port.number()))
396 .filter(cp -> !hostService.getConnectedHosts(cp).isEmpty())
397 .map(ConnectPoint::port)
398 .collect(Collectors.toSet());
399
400 for (PortNumber port : hostPorts) {
401 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SOURCE)) {
402 log.info("Setting port {}/{} as INT source port...", deviceId, port);
403 if (!intProg.setSourcePort(port)) {
404 log.warn("Unable to set INT source port {} on {}", port, deviceId);
405 return false;
406 }
407 }
408 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SINK)) {
409 log.info("Setting port {}/{} as INT sink port...", deviceId, port);
410 if (!intProg.setSinkPort(port)) {
411 log.warn("Unable to set INT sink port {} on {}", port, deviceId);
412 return false;
413 }
414 }
415 }
416
417 if (!intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SOURCE)) {
418 // Stop here, no more configuration needed for sink devices.
419 return true;
420 }
421
422 // Apply intents.
423 // This is a trivial implementation where we simply get the
424 // corresponding INT objective from an intent and we apply to all source
425 // device.
426 final Collection<IntObjective> objectives = intentMap.values().stream()
427 .map(v -> getIntObjective(v.value()))
428 .collect(Collectors.toList());
429 int appliedCount = 0;
430 for (IntObjective objective : objectives) {
431 if (intProg.addIntObjective(objective)) {
432 appliedCount = appliedCount + 1;
433 }
434 }
435
436 log.info("Completed programming of {}, applied {} INT objectives of {} total",
437 deviceId, appliedCount, objectives.size());
438
439 return true;
440 }
441
442 private IntObjective getIntObjective(IntIntent intent) {
443 return new IntObjective.Builder()
444 .withSelector(intent.selector())
445 .withMetadataTypes(intent.metadataTypes())
446 .withHeaderType(intent.headerType())
447 .build();
448 }
449
450 /* Event listeners which trigger device configuration. */
451
452 private class InternalHostListener implements HostListener {
453 @Override
454 public void event(HostEvent event) {
455 final DeviceId deviceId = event.subject().location().deviceId();
456 triggerDeviceConfigure(deviceId);
457 }
458 }
459
460 private class InternalDeviceListener implements DeviceListener {
461 @Override
462 public void event(DeviceEvent event) {
463 switch (event.type()) {
464 case DEVICE_ADDED:
465 case DEVICE_UPDATED:
466 case DEVICE_REMOVED:
467 case DEVICE_SUSPENDED:
468 case DEVICE_AVAILABILITY_CHANGED:
469 case PORT_ADDED:
470 case PORT_UPDATED:
471 case PORT_REMOVED:
472 triggerDeviceConfigure(event.subject().id());
473 return;
474 case PORT_STATS_UPDATED:
475 return;
476 default:
477 log.warn("Unknown device event type {}", event.type());
478 }
479 }
480 }
481
482 private class InternalIntentMapListener
483 implements MapEventListener<IntIntentId, IntIntent> {
484 @Override
485 public void event(MapEvent<IntIntentId, IntIntent> event) {
486 triggerAllDeviceConfigure();
487 }
488 }
489
490 private class InternalIntConfigListener
491 implements AtomicValueEventListener<IntConfig> {
492 @Override
493 public void event(AtomicValueEvent<IntConfig> event) {
494 triggerAllDeviceConfigure();
495 }
496 }
497
498 private class InternalIntStartedListener
499 implements AtomicValueEventListener<Boolean> {
500 @Override
501 public void event(AtomicValueEvent<Boolean> event) {
502 triggerAllDeviceConfigure();
503 }
504 }
505
506 private class InternalDeviceToConfigureListener
507 implements MapEventListener<DeviceId, Long> {
508 @Override
509 public void event(MapEvent<DeviceId, Long> event) {
510 if (event.type().equals(MapEvent.Type.REMOVE) ||
511 event.newValue() == null) {
512 return;
513 }
514 // Schedule task in the future. Wait for events for this device to
515 // stabilize.
516 final DeviceId deviceId = event.key();
517 final long creationTime = event.newValue().creationTime();
518 ScheduledFuture<?> newTask = SharedScheduledExecutors.newTimeout(
519 () -> configDeviceTask(deviceId, creationTime),
520 CONFIG_EVENT_DELAY, TimeUnit.SECONDS);
521 ScheduledFuture<?> oldTask = scheduledDeviceTasks.put(deviceId, newTask);
522 if (oldTask != null) {
523 oldTask.cancel(false);
524 }
525 }
526 }
527}