blob: 54819979fe1ab00992229d158e2611f1bd245f14 [file] [log] [blame]
Carmelo Casconefa421582018-09-13 10:05:57 -07001/*
2 * Copyright 2015-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.inbandtelemetry.impl;
17
18import com.google.common.collect.Maps;
19import com.google.common.util.concurrent.Striped;
Carmelo Casconefa421582018-09-13 10:05:57 -070020import org.onlab.util.KryoNamespace;
21import org.onlab.util.SharedScheduledExecutors;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
24import org.onosproject.inbandtelemetry.api.IntConfig;
25import org.onosproject.inbandtelemetry.api.IntIntent;
26import org.onosproject.inbandtelemetry.api.IntIntentId;
27import org.onosproject.inbandtelemetry.api.IntObjective;
28import org.onosproject.inbandtelemetry.api.IntProgrammable;
29import org.onosproject.inbandtelemetry.api.IntService;
30import org.onosproject.mastership.MastershipService;
31import org.onosproject.net.ConnectPoint;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.MastershipRole;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.device.DeviceEvent;
37import org.onosproject.net.device.DeviceListener;
38import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.host.HostEvent;
40import org.onosproject.net.host.HostListener;
41import org.onosproject.net.host.HostService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.service.AtomicIdGenerator;
44import org.onosproject.store.service.AtomicValue;
45import org.onosproject.store.service.AtomicValueEvent;
46import org.onosproject.store.service.AtomicValueEventListener;
47import org.onosproject.store.service.ConsistentMap;
48import org.onosproject.store.service.MapEvent;
49import org.onosproject.store.service.MapEventListener;
50import org.onosproject.store.service.Serializer;
51import org.onosproject.store.service.StorageService;
52import org.onosproject.store.service.Versioned;
Ray Milkeydb57f1c2018-10-09 10:39:29 -070053import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Reference;
57import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Casconefa421582018-09-13 10:05:57 -070058import org.slf4j.Logger;
59
60import java.util.Collection;
61import java.util.Map;
62import java.util.Optional;
63import java.util.Set;
64import java.util.concurrent.ConcurrentMap;
65import java.util.concurrent.ExecutionException;
66import java.util.concurrent.ScheduledFuture;
67import java.util.concurrent.TimeUnit;
68import java.util.concurrent.TimeoutException;
69import java.util.concurrent.locks.Lock;
70import java.util.stream.Collectors;
71
72import static com.google.common.base.Preconditions.checkNotNull;
73import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * Simple implementation of IntService, for controlling INT-capable pipelines.
77 * <p>
78 * All INT intents are converted to an equivalent INT objective and applied to
79 * all SOURCE_SINK devices. A device is deemed SOURCE_SINK if it has at least
80 * one host attached.
81 * <p>
82 * The implementation listens for different types of events and when required it
83 * configures a device by cleaning-up any previous state and applying the new
84 * one.
85 */
Ray Milkeydb57f1c2018-10-09 10:39:29 -070086@Component(immediate = true, service = IntService.class)
Carmelo Casconefa421582018-09-13 10:05:57 -070087public class SimpleIntManager implements IntService {
88
89 private final Logger log = getLogger(getClass());
90
91 private static final int CONFIG_EVENT_DELAY = 5; // Seconds.
92
93 private static final String APP_NAME = "org.onosproject.inbandtelemetry";
94
Ray Milkeydb57f1c2018-10-09 10:39:29 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -070096 private CoreService coreService;
97
Ray Milkeydb57f1c2018-10-09 10:39:29 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -070099 private DeviceService deviceService;
100
Ray Milkeydb57f1c2018-10-09 10:39:29 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -0700102 private StorageService storageService;
103
Ray Milkeydb57f1c2018-10-09 10:39:29 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -0700105 private MastershipService mastershipService;
106
Ray Milkeydb57f1c2018-10-09 10:39:29 -0700107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconefa421582018-09-13 10:05:57 -0700108 private HostService hostService;
109
110 private final Striped<Lock> deviceLocks = Striped.lock(10);
111
112 private final ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledDeviceTasks = Maps.newConcurrentMap();
113
114 // Distributed state.
115 private ConsistentMap<IntIntentId, IntIntent> intentMap;
116 private ConsistentMap<DeviceId, Long> devicesToConfigure;
117 private AtomicValue<IntConfig> intConfig;
118 private AtomicValue<Boolean> intStarted;
119 private AtomicIdGenerator intentIds;
120
121 // Event listeners.
122 private final InternalHostListener hostListener = new InternalHostListener();
123 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
124 private final InternalIntentMapListener intentMapListener = new InternalIntentMapListener();
125 private final InternalIntConfigListener intConfigListener = new InternalIntConfigListener();
126 private final InternalIntStartedListener intStartedListener = new InternalIntStartedListener();
127 private final InternalDeviceToConfigureListener devicesToConfigureListener =
128 new InternalDeviceToConfigureListener();
129
130 @Activate
131 public void activate() {
132
133 final ApplicationId appId = coreService.registerApplication(APP_NAME);
134
135 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
136 .register(KryoNamespaces.API)
137 .register(IntIntent.class)
138 .register(IntIntentId.class)
139 .register(IntDeviceRole.class)
140 .register(IntIntent.IntHeaderType.class)
141 .register(IntIntent.IntMetadataType.class)
142 .register(IntIntent.IntReportType.class)
143 .register(IntIntent.TelemetryMode.class)
144 .register(IntConfig.class)
145 .register(IntConfig.TelemetrySpec.class);
146
147 devicesToConfigure = storageService.<DeviceId, Long>consistentMapBuilder()
148 .withSerializer(Serializer.using(serializer.build()))
149 .withName("onos-int-devices-to-configure")
150 .withApplicationId(appId)
151 .withPurgeOnUninstall()
152 .build();
153 devicesToConfigure.addListener(devicesToConfigureListener);
154
155 intentMap = storageService.<IntIntentId, IntIntent>consistentMapBuilder()
156 .withSerializer(Serializer.using(serializer.build()))
157 .withName("onos-int-intents")
158 .withApplicationId(appId)
159 .withPurgeOnUninstall()
160 .build();
161 intentMap.addListener(intentMapListener);
162
163 intStarted = storageService.<Boolean>atomicValueBuilder()
164 .withSerializer(Serializer.using(serializer.build()))
165 .withName("onos-int-started")
166 .withApplicationId(appId)
167 .build()
168 .asAtomicValue();
169 intStarted.addListener(intStartedListener);
170
171 intConfig = storageService.<IntConfig>atomicValueBuilder()
172 .withSerializer(Serializer.using(serializer.build()))
173 .withName("onos-int-config")
174 .withApplicationId(appId)
175 .build()
176 .asAtomicValue();
177 intConfig.addListener(intConfigListener);
178
179 intentIds = storageService.getAtomicIdGenerator("int-intent-id-generator");
180
181 // Bootstrap config for already existing devices.
182 triggerAllDeviceConfigure();
183
184 hostService.addListener(hostListener);
185 deviceService.addListener(deviceListener);
186
187 startInt();
188 log.info("Started", appId.id());
189 }
190
191 @Deactivate
192 public void deactivate() {
193 deviceService.removeListener(deviceListener);
194 hostService.removeListener(hostListener);
195 intentIds = null;
196 intConfig.removeListener(intConfigListener);
197 intConfig = null;
198 intStarted.removeListener(intStartedListener);
199 intStarted = null;
200 intentMap.removeListener(intentMapListener);
201 intentMap = null;
202 devicesToConfigure.removeListener(devicesToConfigureListener);
203 devicesToConfigure.destroy();
204 devicesToConfigure = null;
205 // Cancel tasks (if any).
206 scheduledDeviceTasks.values().forEach(f -> {
207 f.cancel(true);
208 if (!f.isDone()) {
209 try {
210 f.get(1, TimeUnit.SECONDS);
211 } catch (InterruptedException | ExecutionException | TimeoutException e) {
212 // Don't care, we are terminating the service anyways.
213 }
214 }
215 });
216 // Clean up INT rules from existing devices.
217 deviceService.getDevices().forEach(d -> cleanupDevice(d.id()));
218 log.info("Deactivated");
219 }
220
221 @Override
222 public void startInt() {
223 // Atomic value event will trigger device configure.
224 intStarted.set(true);
225 }
226
227 @Override
228 public void startInt(Set<DeviceId> deviceIds) {
229 log.warn("Starting INT for a subset of devices is not supported");
230 }
231
232 @Override
233 public void stopInt() {
234 // Atomic value event will trigger device configure.
235 intStarted.set(false);
236 }
237
238 @Override
239 public void stopInt(Set<DeviceId> deviceIds) {
240 log.warn("Stopping INT for a subset of devices is not supported");
241 }
242
243 @Override
244 public void setConfig(IntConfig cfg) {
245 checkNotNull(cfg);
246 // Atomic value event will trigger device configure.
247 intConfig.set(cfg);
248 }
249
250 @Override
251 public IntConfig getConfig() {
252 return intConfig.get();
253 }
254
255 @Override
256 public IntIntentId installIntIntent(IntIntent intent) {
257 checkNotNull(intent);
258 final Integer intentId = (int) intentIds.nextId();
259 final IntIntentId intIntentId = IntIntentId.valueOf(intentId);
260 // Intent map event will trigger device configure.
261 intentMap.put(intIntentId, intent);
262 return intIntentId;
263 }
264
265 @Override
266 public void removeIntIntent(IntIntentId intentId) {
267 checkNotNull(intentId);
268 // Intent map event will trigger device configure.
269 intentMap.remove(intentId).value();
270 }
271
272 @Override
273 public IntIntent getIntIntent(IntIntentId intentId) {
274 return Optional.ofNullable(intentMap.get(intentId).value()).orElse(null);
275 }
276
277 @Override
278 public Map<IntIntentId, IntIntent> getIntIntents() {
279 return intentMap.asJavaMap();
280 }
281
282 private boolean isConfigTaskValid(DeviceId deviceId, long creationTime) {
283 Versioned<?> versioned = devicesToConfigure.get(deviceId);
284 return versioned != null && versioned.creationTime() == creationTime;
285 }
286
287 private boolean isIntStarted() {
288 return intStarted.get();
289 }
290
291 private boolean isNotIntConfigured() {
292 return intConfig.get() == null;
293 }
294
295 private boolean isIntProgrammable(DeviceId deviceId) {
296 final Device device = deviceService.getDevice(deviceId);
297 return device != null && device.is(IntProgrammable.class);
298 }
299
300 private void triggerDeviceConfigure(DeviceId deviceId) {
301 if (isIntProgrammable(deviceId)) {
302 devicesToConfigure.put(deviceId, System.nanoTime());
303 }
304 }
305
306 private void triggerAllDeviceConfigure() {
307 deviceService.getDevices().forEach(d -> triggerDeviceConfigure(d.id()));
308 }
309
310 private void configDeviceTask(DeviceId deviceId, long creationTime) {
311 if (isConfigTaskValid(deviceId, creationTime)) {
312 // Task outdated.
313 return;
314 }
315 if (!deviceService.isAvailable(deviceId)) {
316 return;
317 }
318 final MastershipRole role = mastershipService.requestRoleForSync(deviceId);
319 if (!role.equals(MastershipRole.MASTER)) {
320 return;
321 }
322 deviceLocks.get(deviceId).lock();
323 try {
324 // Clean up first.
325 cleanupDevice(deviceId);
326 if (!configDevice(deviceId)) {
327 // Clean up if fails.
328 cleanupDevice(deviceId);
329 return;
330 }
331 devicesToConfigure.remove(deviceId);
332 } finally {
333 deviceLocks.get(deviceId).unlock();
334 }
335 }
336
337 private void cleanupDevice(DeviceId deviceId) {
338 final Device device = deviceService.getDevice(deviceId);
339 if (device == null || !device.is(IntProgrammable.class)) {
340 return;
341 }
342 device.as(IntProgrammable.class).cleanup();
343 }
344
345 private boolean configDevice(DeviceId deviceId) {
346 // Returns true if config was successful, false if not and a clean up is
347 // needed.
348 final Device device = deviceService.getDevice(deviceId);
349 if (device == null || !device.is(IntProgrammable.class)) {
350 return true;
351 }
352
353 if (isNotIntConfigured()) {
354 log.warn("Missing INT config, aborting programming of INT device {}", deviceId);
355 return true;
356 }
357
358 final boolean isEdge = !hostService.getConnectedHosts(deviceId).isEmpty();
359 final IntDeviceRole intDeviceRole = isEdge
360 ? IntDeviceRole.SOURCE_SINK
361 : IntDeviceRole.TRANSIT;
362
363 log.info("Started programming of INT device {} with role {}...",
364 deviceId, intDeviceRole);
365
366 final IntProgrammable intProg = device.as(IntProgrammable.class);
367
368 if (!isIntStarted()) {
369 // Leave device with no INT configuration.
370 return true;
371 }
372
373 if (!intProg.init()) {
374 log.warn("Unable to init INT pipeline on {}", deviceId);
375 return false;
376 }
377
378 if (intDeviceRole != IntDeviceRole.SOURCE_SINK) {
379 // Stop here, no more configuration needed for transit devices.
380 return true;
381 }
382
383 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SINK)) {
384 if (!intProg.setupIntConfig(intConfig.get())) {
385 log.warn("Unable to apply INT report config on {}", deviceId);
386 return false;
387 }
388 }
389
390 // Port configuration.
391 final Set<PortNumber> hostPorts = deviceService.getPorts(deviceId)
392 .stream()
393 .map(port -> new ConnectPoint(deviceId, port.number()))
394 .filter(cp -> !hostService.getConnectedHosts(cp).isEmpty())
395 .map(ConnectPoint::port)
396 .collect(Collectors.toSet());
397
398 for (PortNumber port : hostPorts) {
399 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SOURCE)) {
400 log.info("Setting port {}/{} as INT source port...", deviceId, port);
401 if (!intProg.setSourcePort(port)) {
402 log.warn("Unable to set INT source port {} on {}", port, deviceId);
403 return false;
404 }
405 }
406 if (intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SINK)) {
407 log.info("Setting port {}/{} as INT sink port...", deviceId, port);
408 if (!intProg.setSinkPort(port)) {
409 log.warn("Unable to set INT sink port {} on {}", port, deviceId);
410 return false;
411 }
412 }
413 }
414
415 if (!intProg.supportsFunctionality(IntProgrammable.IntFunctionality.SOURCE)) {
416 // Stop here, no more configuration needed for sink devices.
417 return true;
418 }
419
420 // Apply intents.
421 // This is a trivial implementation where we simply get the
422 // corresponding INT objective from an intent and we apply to all source
423 // device.
424 final Collection<IntObjective> objectives = intentMap.values().stream()
425 .map(v -> getIntObjective(v.value()))
426 .collect(Collectors.toList());
427 int appliedCount = 0;
428 for (IntObjective objective : objectives) {
429 if (intProg.addIntObjective(objective)) {
430 appliedCount = appliedCount + 1;
431 }
432 }
433
434 log.info("Completed programming of {}, applied {} INT objectives of {} total",
435 deviceId, appliedCount, objectives.size());
436
437 return true;
438 }
439
440 private IntObjective getIntObjective(IntIntent intent) {
441 return new IntObjective.Builder()
442 .withSelector(intent.selector())
443 .withMetadataTypes(intent.metadataTypes())
444 .withHeaderType(intent.headerType())
445 .build();
446 }
447
448 /* Event listeners which trigger device configuration. */
449
450 private class InternalHostListener implements HostListener {
451 @Override
452 public void event(HostEvent event) {
453 final DeviceId deviceId = event.subject().location().deviceId();
454 triggerDeviceConfigure(deviceId);
455 }
456 }
457
458 private class InternalDeviceListener implements DeviceListener {
459 @Override
460 public void event(DeviceEvent event) {
461 switch (event.type()) {
462 case DEVICE_ADDED:
463 case DEVICE_UPDATED:
464 case DEVICE_REMOVED:
465 case DEVICE_SUSPENDED:
466 case DEVICE_AVAILABILITY_CHANGED:
467 case PORT_ADDED:
468 case PORT_UPDATED:
469 case PORT_REMOVED:
470 triggerDeviceConfigure(event.subject().id());
471 return;
472 case PORT_STATS_UPDATED:
473 return;
474 default:
475 log.warn("Unknown device event type {}", event.type());
476 }
477 }
478 }
479
480 private class InternalIntentMapListener
481 implements MapEventListener<IntIntentId, IntIntent> {
482 @Override
483 public void event(MapEvent<IntIntentId, IntIntent> event) {
484 triggerAllDeviceConfigure();
485 }
486 }
487
488 private class InternalIntConfigListener
489 implements AtomicValueEventListener<IntConfig> {
490 @Override
491 public void event(AtomicValueEvent<IntConfig> event) {
492 triggerAllDeviceConfigure();
493 }
494 }
495
496 private class InternalIntStartedListener
497 implements AtomicValueEventListener<Boolean> {
498 @Override
499 public void event(AtomicValueEvent<Boolean> event) {
500 triggerAllDeviceConfigure();
501 }
502 }
503
504 private class InternalDeviceToConfigureListener
505 implements MapEventListener<DeviceId, Long> {
506 @Override
507 public void event(MapEvent<DeviceId, Long> event) {
508 if (event.type().equals(MapEvent.Type.REMOVE) ||
509 event.newValue() == null) {
510 return;
511 }
512 // Schedule task in the future. Wait for events for this device to
513 // stabilize.
514 final DeviceId deviceId = event.key();
515 final long creationTime = event.newValue().creationTime();
516 ScheduledFuture<?> newTask = SharedScheduledExecutors.newTimeout(
517 () -> configDeviceTask(deviceId, creationTime),
518 CONFIG_EVENT_DELAY, TimeUnit.SECONDS);
519 ScheduledFuture<?> oldTask = scheduledDeviceTasks.put(deviceId, newTask);
520 if (oldTask != null) {
521 oldTask.cancel(false);
522 }
523 }
524 }
525}