blob: 906efd8921bd49f36cb1d416de2466ca31c29295 [file] [log] [blame]
Carmelo Casconefa421582018-09-13 10:05:57 -07001/*
2 * Copyright 2015-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.inbandtelemetry.impl;
17
18import com.google.common.collect.Maps;
19import com.google.common.util.concurrent.Striped;
Carmelo Casconefa421582018-09-13 10:05:57 -070020import org.onlab.util.KryoNamespace;
21import org.onlab.util.SharedScheduledExecutors;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
Carmelo Casconedefc74e2020-07-17 15:27:02 -070024import org.onosproject.net.behaviour.inbandtelemetry.IntMetadataType;
25import org.onosproject.net.behaviour.inbandtelemetry.IntDeviceConfig;
Carmelo Casconefa421582018-09-13 10:05:57 -070026import org.onosproject.inbandtelemetry.api.IntIntent;
27import org.onosproject.inbandtelemetry.api.IntIntentId;
Carmelo Casconedefc74e2020-07-17 15:27:02 -070028import org.onosproject.net.behaviour.inbandtelemetry.IntObjective;
29import org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable;
Carmelo Casconefa421582018-09-13 10:05:57 -070030import org.onosproject.inbandtelemetry.api.IntService;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.ConnectPoint;
33import org.onosproject.net.Device;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.MastershipRole;
36import org.onosproject.net.PortNumber;
37import org.onosproject.net.device.DeviceEvent;
38import org.onosproject.net.device.DeviceListener;
39import org.onosproject.net.device.DeviceService;
40import org.onosproject.net.host.HostEvent;
41import org.onosproject.net.host.HostListener;
42import org.onosproject.net.host.HostService;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.AtomicIdGenerator;
45import org.onosproject.store.service.AtomicValue;
46import org.onosproject.store.service.AtomicValueEvent;
47import org.onosproject.store.service.AtomicValueEventListener;
48import org.onosproject.store.service.ConsistentMap;
49import org.onosproject.store.service.MapEvent;
50import org.onosproject.store.service.MapEventListener;
51import org.onosproject.store.service.Serializer;
52import org.onosproject.store.service.StorageService;
53import org.onosproject.store.service.Versioned;
Ray Milkeydb57f1c2018-10-09 10:39:29 -070054import org.osgi.service.component.annotations.Activate;
55import org.osgi.service.component.annotations.Component;
56import org.osgi.service.component.annotations.Deactivate;
57import org.osgi.service.component.annotations.Reference;
58import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Casconefa421582018-09-13 10:05:57 -070059import org.slf4j.Logger;
60
61import java.util.Collection;
62import java.util.Map;
63import java.util.Optional;
64import java.util.Set;
65import java.util.concurrent.ConcurrentMap;
66import java.util.concurrent.ExecutionException;
67import java.util.concurrent.ScheduledFuture;
68import java.util.concurrent.TimeUnit;
69import java.util.concurrent.TimeoutException;
70import java.util.concurrent.locks.Lock;
71import java.util.stream.Collectors;
72
73import static com.google.common.base.Preconditions.checkNotNull;
74import static org.slf4j.LoggerFactory.getLogger;
75
76/**
77 * Simple implementation of IntService, for controlling INT-capable pipelines.
78 * <p>
79 * All INT intents are converted to an equivalent INT objective and applied to
80 * all SOURCE_SINK devices. A device is deemed SOURCE_SINK if it has at least
81 * one host attached.
82 * <p>
83 * The implementation listens for different types of events and when required it
84 * configures a device by cleaning-up any previous state and applying the new
85 * one.
86 */
Ray Milkeydb57f1c2018-10-09 10:39:29 -070087@Component(immediate = true, service = IntService.class)
Carmelo Casconefa421582018-09-13 10:05:57 -070088public class SimpleIntManager implements IntService {
89
90 private final Logger log = getLogger(getClass());
91
92 private static final int CONFIG_EVENT_DELAY = 5; // Seconds.
93
94 private static final String APP_NAME = "org.onosproject.inbandtelemetry";
95
Ray Milkeydb57f1c2018-10-09 10:39:29 -070096 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -070097 private CoreService coreService;
98
Ray Milkeydb57f1c2018-10-09 10:39:29 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -0700100 private DeviceService deviceService;
101
Ray Milkeydb57f1c2018-10-09 10:39:29 -0700102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -0700103 private StorageService storageService;
104
Ray Milkeydb57f1c2018-10-09 10:39:29 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -0700106 private MastershipService mastershipService;
107
Ray Milkeydb57f1c2018-10-09 10:39:29 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -0700109 private HostService hostService;
110
111 private final Striped<Lock> deviceLocks = Striped.lock(10);
112
113 private final ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledDeviceTasks = Maps.newConcurrentMap();
114
115 // Distributed state.
116 private ConsistentMap<IntIntentId, IntIntent> intentMap;
117 private ConsistentMap<DeviceId, Long> devicesToConfigure;
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700118 private AtomicValue<IntDeviceConfig> intConfig;
Carmelo Casconefa421582018-09-13 10:05:57 -0700119 private AtomicValue<Boolean> intStarted;
120 private AtomicIdGenerator intentIds;
121
122 // Event listeners.
123 private final InternalHostListener hostListener = new InternalHostListener();
124 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
125 private final InternalIntentMapListener intentMapListener = new InternalIntentMapListener();
126 private final InternalIntConfigListener intConfigListener = new InternalIntConfigListener();
127 private final InternalIntStartedListener intStartedListener = new InternalIntStartedListener();
128 private final InternalDeviceToConfigureListener devicesToConfigureListener =
129 new InternalDeviceToConfigureListener();
130
131 @Activate
132 public void activate() {
133
134 final ApplicationId appId = coreService.registerApplication(APP_NAME);
135
136 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
137 .register(KryoNamespaces.API)
138 .register(IntIntent.class)
139 .register(IntIntentId.class)
140 .register(IntDeviceRole.class)
141 .register(IntIntent.IntHeaderType.class)
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700142 .register(IntMetadataType.class)
Carmelo Casconefa421582018-09-13 10:05:57 -0700143 .register(IntIntent.IntReportType.class)
144 .register(IntIntent.TelemetryMode.class)
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700145 .register(IntDeviceConfig.class)
146 .register(IntDeviceConfig.TelemetrySpec.class);
Carmelo Casconefa421582018-09-13 10:05:57 -0700147
148 devicesToConfigure = storageService.<DeviceId, Long>consistentMapBuilder()
149 .withSerializer(Serializer.using(serializer.build()))
150 .withName("onos-int-devices-to-configure")
151 .withApplicationId(appId)
152 .withPurgeOnUninstall()
153 .build();
154 devicesToConfigure.addListener(devicesToConfigureListener);
155
156 intentMap = storageService.<IntIntentId, IntIntent>consistentMapBuilder()
157 .withSerializer(Serializer.using(serializer.build()))
158 .withName("onos-int-intents")
159 .withApplicationId(appId)
160 .withPurgeOnUninstall()
161 .build();
162 intentMap.addListener(intentMapListener);
163
164 intStarted = storageService.<Boolean>atomicValueBuilder()
165 .withSerializer(Serializer.using(serializer.build()))
166 .withName("onos-int-started")
167 .withApplicationId(appId)
168 .build()
169 .asAtomicValue();
170 intStarted.addListener(intStartedListener);
171
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700172 intConfig = storageService.<IntDeviceConfig>atomicValueBuilder()
Carmelo Casconefa421582018-09-13 10:05:57 -0700173 .withSerializer(Serializer.using(serializer.build()))
174 .withName("onos-int-config")
175 .withApplicationId(appId)
176 .build()
177 .asAtomicValue();
178 intConfig.addListener(intConfigListener);
179
180 intentIds = storageService.getAtomicIdGenerator("int-intent-id-generator");
181
182 // Bootstrap config for already existing devices.
183 triggerAllDeviceConfigure();
184
185 hostService.addListener(hostListener);
186 deviceService.addListener(deviceListener);
187
188 startInt();
189 log.info("Started", appId.id());
190 }
191
192 @Deactivate
193 public void deactivate() {
194 deviceService.removeListener(deviceListener);
195 hostService.removeListener(hostListener);
196 intentIds = null;
197 intConfig.removeListener(intConfigListener);
198 intConfig = null;
199 intStarted.removeListener(intStartedListener);
200 intStarted = null;
201 intentMap.removeListener(intentMapListener);
202 intentMap = null;
203 devicesToConfigure.removeListener(devicesToConfigureListener);
204 devicesToConfigure.destroy();
205 devicesToConfigure = null;
206 // Cancel tasks (if any).
207 scheduledDeviceTasks.values().forEach(f -> {
208 f.cancel(true);
209 if (!f.isDone()) {
210 try {
211 f.get(1, TimeUnit.SECONDS);
212 } catch (InterruptedException | ExecutionException | TimeoutException e) {
213 // Don't care, we are terminating the service anyways.
214 }
215 }
216 });
217 // Clean up INT rules from existing devices.
218 deviceService.getDevices().forEach(d -> cleanupDevice(d.id()));
219 log.info("Deactivated");
220 }
221
222 @Override
223 public void startInt() {
224 // Atomic value event will trigger device configure.
225 intStarted.set(true);
226 }
227
228 @Override
229 public void startInt(Set<DeviceId> deviceIds) {
230 log.warn("Starting INT for a subset of devices is not supported");
231 }
232
233 @Override
234 public void stopInt() {
235 // Atomic value event will trigger device configure.
236 intStarted.set(false);
237 }
238
239 @Override
240 public void stopInt(Set<DeviceId> deviceIds) {
241 log.warn("Stopping INT for a subset of devices is not supported");
242 }
243
244 @Override
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700245 public void setConfig(IntDeviceConfig cfg) {
Carmelo Casconefa421582018-09-13 10:05:57 -0700246 checkNotNull(cfg);
247 // Atomic value event will trigger device configure.
248 intConfig.set(cfg);
249 }
250
251 @Override
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700252 public IntDeviceConfig getConfig() {
Carmelo Casconefa421582018-09-13 10:05:57 -0700253 return intConfig.get();
254 }
255
256 @Override
257 public IntIntentId installIntIntent(IntIntent intent) {
258 checkNotNull(intent);
259 final Integer intentId = (int) intentIds.nextId();
260 final IntIntentId intIntentId = IntIntentId.valueOf(intentId);
261 // Intent map event will trigger device configure.
262 intentMap.put(intIntentId, intent);
263 return intIntentId;
264 }
265
266 @Override
267 public void removeIntIntent(IntIntentId intentId) {
268 checkNotNull(intentId);
269 // Intent map event will trigger device configure.
270 intentMap.remove(intentId).value();
271 }
272
273 @Override
274 public IntIntent getIntIntent(IntIntentId intentId) {
275 return Optional.ofNullable(intentMap.get(intentId).value()).orElse(null);
276 }
277
278 @Override
279 public Map<IntIntentId, IntIntent> getIntIntents() {
280 return intentMap.asJavaMap();
281 }
282
283 private boolean isConfigTaskValid(DeviceId deviceId, long creationTime) {
284 Versioned<?> versioned = devicesToConfigure.get(deviceId);
285 return versioned != null && versioned.creationTime() == creationTime;
286 }
287
288 private boolean isIntStarted() {
289 return intStarted.get();
290 }
291
292 private boolean isNotIntConfigured() {
293 return intConfig.get() == null;
294 }
295
296 private boolean isIntProgrammable(DeviceId deviceId) {
297 final Device device = deviceService.getDevice(deviceId);
298 return device != null && device.is(IntProgrammable.class);
299 }
300
301 private void triggerDeviceConfigure(DeviceId deviceId) {
302 if (isIntProgrammable(deviceId)) {
303 devicesToConfigure.put(deviceId, System.nanoTime());
304 }
305 }
306
307 private void triggerAllDeviceConfigure() {
308 deviceService.getDevices().forEach(d -> triggerDeviceConfigure(d.id()));
309 }
310
311 private void configDeviceTask(DeviceId deviceId, long creationTime) {
312 if (isConfigTaskValid(deviceId, creationTime)) {
313 // Task outdated.
314 return;
315 }
316 if (!deviceService.isAvailable(deviceId)) {
317 return;
318 }
319 final MastershipRole role = mastershipService.requestRoleForSync(deviceId);
320 if (!role.equals(MastershipRole.MASTER)) {
321 return;
322 }
323 deviceLocks.get(deviceId).lock();
324 try {
325 // Clean up first.
326 cleanupDevice(deviceId);
327 if (!configDevice(deviceId)) {
328 // Clean up if fails.
329 cleanupDevice(deviceId);
330 return;
331 }
332 devicesToConfigure.remove(deviceId);
333 } finally {
334 deviceLocks.get(deviceId).unlock();
335 }
336 }
337
338 private void cleanupDevice(DeviceId deviceId) {
339 final Device device = deviceService.getDevice(deviceId);
340 if (device == null || !device.is(IntProgrammable.class)) {
341 return;
342 }
343 device.as(IntProgrammable.class).cleanup();
344 }
345
346 private boolean configDevice(DeviceId deviceId) {
347 // Returns true if config was successful, false if not and a clean up is
348 // needed.
349 final Device device = deviceService.getDevice(deviceId);
350 if (device == null || !device.is(IntProgrammable.class)) {
351 return true;
352 }
353
354 if (isNotIntConfigured()) {
355 log.warn("Missing INT config, aborting programming of INT device {}", deviceId);
356 return true;
357 }
358
359 final boolean isEdge = !hostService.getConnectedHosts(deviceId).isEmpty();
360 final IntDeviceRole intDeviceRole = isEdge
361 ? IntDeviceRole.SOURCE_SINK
362 : IntDeviceRole.TRANSIT;
363
364 log.info("Started programming of INT device {} with role {}...",
365 deviceId, intDeviceRole);
366
367 final IntProgrammable intProg = device.as(IntProgrammable.class);
368
369 if (!isIntStarted()) {
370 // Leave device with no INT configuration.
371 return true;
372 }
373
374 if (!intProg.init()) {
375 log.warn("Unable to init INT pipeline on {}", deviceId);
376 return false;
377 }
378
379 if (intDeviceRole != IntDeviceRole.SOURCE_SINK) {
380 // Stop here, no more configuration needed for transit devices.
381 return true;
382 }
383
384 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SINK)) {
385 if (!intProg.setupIntConfig(intConfig.get())) {
386 log.warn("Unable to apply INT report config on {}", deviceId);
387 return false;
388 }
389 }
390
391 // Port configuration.
392 final Set<PortNumber> hostPorts = deviceService.getPorts(deviceId)
393 .stream()
394 .map(port -> new ConnectPoint(deviceId, port.number()))
395 .filter(cp -> !hostService.getConnectedHosts(cp).isEmpty())
396 .map(ConnectPoint::port)
397 .collect(Collectors.toSet());
398
399 for (PortNumber port : hostPorts) {
400 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SOURCE)) {
401 log.info("Setting port {}/{} as INT source port...", deviceId, port);
402 if (!intProg.setSourcePort(port)) {
403 log.warn("Unable to set INT source port {} on {}", port, deviceId);
404 return false;
405 }
406 }
407 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SINK)) {
408 log.info("Setting port {}/{} as INT sink port...", deviceId, port);
409 if (!intProg.setSinkPort(port)) {
410 log.warn("Unable to set INT sink port {} on {}", port, deviceId);
411 return false;
412 }
413 }
414 }
415
416 if (!intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SOURCE)) {
417 // Stop here, no more configuration needed for sink devices.
418 return true;
419 }
420
421 // Apply intents.
422 // This is a trivial implementation where we simply get the
423 // corresponding INT objective from an intent and we apply to all source
424 // device.
425 final Collection<IntObjective> objectives = intentMap.values().stream()
426 .map(v -> getIntObjective(v.value()))
427 .collect(Collectors.toList());
428 int appliedCount = 0;
429 for (IntObjective objective : objectives) {
430 if (intProg.addIntObjective(objective)) {
431 appliedCount = appliedCount + 1;
432 }
433 }
434
435 log.info("Completed programming of {}, applied {} INT objectives of {} total",
436 deviceId, appliedCount, objectives.size());
437
438 return true;
439 }
440
441 private IntObjective getIntObjective(IntIntent intent) {
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700442 // FIXME: we are ignore intent.headerType()
443 // what should we do with it?
Carmelo Casconefa421582018-09-13 10:05:57 -0700444 return new IntObjective.Builder()
445 .withSelector(intent.selector())
446 .withMetadataTypes(intent.metadataTypes())
Carmelo Casconefa421582018-09-13 10:05:57 -0700447 .build();
448 }
449
450 /* Event listeners which trigger device configuration. */
451
452 private class InternalHostListener implements HostListener {
453 @Override
454 public void event(HostEvent event) {
455 final DeviceId deviceId = event.subject().location().deviceId();
456 triggerDeviceConfigure(deviceId);
457 }
458 }
459
460 private class InternalDeviceListener implements DeviceListener {
461 @Override
462 public void event(DeviceEvent event) {
463 switch (event.type()) {
464 case DEVICE_ADDED:
465 case DEVICE_UPDATED:
466 case DEVICE_REMOVED:
467 case DEVICE_SUSPENDED:
468 case DEVICE_AVAILABILITY_CHANGED:
469 case PORT_ADDED:
470 case PORT_UPDATED:
471 case PORT_REMOVED:
472 triggerDeviceConfigure(event.subject().id());
473 return;
474 case PORT_STATS_UPDATED:
475 return;
476 default:
477 log.warn("Unknown device event type {}", event.type());
478 }
479 }
480 }
481
482 private class InternalIntentMapListener
483 implements MapEventListener<IntIntentId, IntIntent> {
484 @Override
485 public void event(MapEvent<IntIntentId, IntIntent> event) {
486 triggerAllDeviceConfigure();
487 }
488 }
489
490 private class InternalIntConfigListener
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700491 implements AtomicValueEventListener<IntDeviceConfig> {
Carmelo Casconefa421582018-09-13 10:05:57 -0700492 @Override
Carmelo Casconedefc74e2020-07-17 15:27:02 -0700493 public void event(AtomicValueEvent<IntDeviceConfig> event) {
Carmelo Casconefa421582018-09-13 10:05:57 -0700494 triggerAllDeviceConfigure();
495 }
496 }
497
498 private class InternalIntStartedListener
499 implements AtomicValueEventListener<Boolean> {
500 @Override
501 public void event(AtomicValueEvent<Boolean> event) {
502 triggerAllDeviceConfigure();
503 }
504 }
505
506 private class InternalDeviceToConfigureListener
507 implements MapEventListener<DeviceId, Long> {
508 @Override
509 public void event(MapEvent<DeviceId, Long> event) {
510 if (event.type().equals(MapEvent.Type.REMOVE) ||
511 event.newValue() == null) {
512 return;
513 }
514 // Schedule task in the future. Wait for events for this device to
515 // stabilize.
516 final DeviceId deviceId = event.key();
517 final long creationTime = event.newValue().creationTime();
518 ScheduledFuture<?> newTask = SharedScheduledExecutors.newTimeout(
519 () -> configDeviceTask(deviceId, creationTime),
520 CONFIG_EVENT_DELAY, TimeUnit.SECONDS);
521 ScheduledFuture<?> oldTask = scheduledDeviceTasks.put(deviceId, newTask);
522 if (oldTask != null) {
523 oldTask.cancel(false);
524 }
525 }
526 }
527}