blob: eafb778cc2ae9abf73dd8c91aacfbc1637bae6a9 [file] [log] [blame]
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.pi.impl;
18
19import com.google.common.collect.Maps;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080020import com.google.common.util.concurrent.Futures;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070021import com.google.common.util.concurrent.Striped;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070022import org.onlab.util.KryoNamespace;
23import org.onlab.util.Tools;
24import org.onosproject.cfg.ComponentConfigService;
25import org.onosproject.event.AbstractListenerManager;
26import org.onosproject.mastership.MastershipInfo;
27import org.onosproject.mastership.MastershipService;
28import org.onosproject.net.Device;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.MastershipRole;
31import org.onosproject.net.behaviour.PiPipelineProgrammable;
32import org.onosproject.net.device.DeviceEvent;
33import org.onosproject.net.device.DeviceHandshaker;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.pi.model.PiPipeconf;
37import org.onosproject.net.pi.model.PiPipeconfId;
38import org.onosproject.net.pi.service.PiPipeconfMappingStore;
39import org.onosproject.net.pi.service.PiPipeconfService;
40import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
41import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
42import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.EventuallyConsistentMap;
45import org.onosproject.store.service.EventuallyConsistentMapEvent;
46import org.onosproject.store.service.EventuallyConsistentMapListener;
47import org.onosproject.store.service.StorageService;
48import org.onosproject.store.service.WallClockTimestamp;
49import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070050import org.osgi.service.component.annotations.Activate;
51import org.osgi.service.component.annotations.Component;
52import org.osgi.service.component.annotations.Deactivate;
53import org.osgi.service.component.annotations.Modified;
54import org.osgi.service.component.annotations.Reference;
55import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070056import org.slf4j.Logger;
57
58import java.util.Collections;
59import java.util.Dictionary;
60import java.util.Map;
61import java.util.Timer;
62import java.util.TimerTask;
63import java.util.concurrent.ExecutionException;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
66import java.util.concurrent.TimeUnit;
67import java.util.concurrent.TimeoutException;
68import java.util.concurrent.locks.Lock;
69
70import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd04e2272018-10-16 18:20:18 -070071import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL;
72import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070073import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * Implementation of PiPipeconfWatchdogService that implements a periodic
77 * pipeline probe task and listens for device events to update the status of the
78 * pipeline.
79 */
Ray Milkeyd04e2272018-10-16 18:20:18 -070080@Component(
81 immediate = true,
82 service = PiPipeconfWatchdogService.class,
83 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070084 PWM_PROBE_INTERVAL + ":Integer=" + PWM_PROBE_INTERVAL_DEFAULT
Ray Milkeyd04e2272018-10-16 18:20:18 -070085 }
86)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070087public class PiPipeconfWatchdogManager
88 extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
89 implements PiPipeconfWatchdogService {
90
91 private final Logger log = getLogger(getClass());
92
93 private static final long SECONDS = 1000L;
94 // Long enough to allow for network delay (e.g. to transfer large pipeline
95 // binaries over slow network).
96 private static final long PIPECONF_SET_TIMEOUT = 60; // Seconds.
97
Ray Milkeyd84f89b2018-08-17 14:54:17 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070099 private PiPipeconfMappingStore pipeconfMappingStore;
100
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700102 private DeviceService deviceService;
103
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700105 private MastershipService mastershipService;
106
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700108 protected PiPipeconfService pipeconfService;
109
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700111 protected StorageService storageService;
112
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700114 private ComponentConfigService componentConfigService;
115
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700116 /** Configure interval in seconds for device pipeconf probing. */
Ray Milkeyd04e2272018-10-16 18:20:18 -0700117 private int probeInterval = PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700118
119 protected ExecutorService executor = Executors.newFixedThreadPool(
120 30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
121
122 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
123
124 private Timer timer;
125 private TimerTask task;
126
127 private final Striped<Lock> locks = Striped.lock(30);
128
129 private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
130 private Map<DeviceId, PipelineStatus> localStatusMap;
131
132 @Activate
133 public void activate() {
134 eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
135 localStatusMap = Maps.newConcurrentMap();
136 // Init distributed status map.
137 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
138 .register(KryoNamespaces.API)
139 .register(PipelineStatus.class);
140 statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
141 .withName("onos-pipeconf-status-table")
142 .withSerializer(serializer)
143 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
144 statusMap.addListener(new StatusMapListener());
145 // Register component configurable properties.
146 componentConfigService.registerProperties(getClass());
147 // Start periodic watchdog task.
148 timer = new Timer();
149 startProbeTask();
150 // Add device listener.
151 deviceService.addListener(deviceListener);
152 log.info("Started");
153 }
154
155 @Modified
156 public void modified(ComponentContext context) {
157 if (context == null) {
158 return;
159 }
160
161 Dictionary<?, ?> properties = context.getProperties();
162 final int oldProbeInterval = probeInterval;
163 probeInterval = Tools.getIntegerProperty(
Ray Milkeyd04e2272018-10-16 18:20:18 -0700164 properties, PWM_PROBE_INTERVAL, PWM_PROBE_INTERVAL_DEFAULT);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700165 log.info("Configured. {} is configured to {} seconds",
Ray Milkeyd04e2272018-10-16 18:20:18 -0700166 PWM_PROBE_INTERVAL_DEFAULT, probeInterval);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700167
168 if (oldProbeInterval != probeInterval) {
169 rescheduleProbeTask();
170 }
171 }
172
173 @Deactivate
174 public void deactivate() {
175 eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
176 deviceService.removeListener(deviceListener);
177 stopProbeTask();
178 timer = null;
179 statusMap = null;
180 localStatusMap = null;
181 log.info("Stopped");
182 }
183
184 @Override
185 public void triggerProbe(DeviceId deviceId) {
186 final Device device = deviceService.getDevice(deviceId);
187 if (device != null) {
188 filterAndTriggerTasks(Collections.singleton(device));
189 }
190 }
191
192 @Override
193 public PipelineStatus getStatus(DeviceId deviceId) {
194 final PipelineStatus status = statusMap.get(deviceId);
195 return status == null ? PipelineStatus.UNKNOWN : status;
196 }
197
198 private void triggerCheckAllDevices() {
199 filterAndTriggerTasks(deviceService.getDevices());
200 }
201
202 private void filterAndTriggerTasks(Iterable<Device> devices) {
203 devices.forEach(device -> {
204 if (!isLocalMaster(device)) {
205 return;
206 }
207
208 final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
209 if (pipeconfId == null
210 || !device.is(PiPipelineProgrammable.class)) {
211 return;
212 }
213
214 if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
215 log.error("Pipeconf {} is not registered", pipeconfId);
216 return;
217 }
218
219 final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
220
221 if (!device.is(DeviceHandshaker.class)) {
222 log.error("Missing DeviceHandshaker behavior for {}", device.id());
223 return;
224 }
225
226 // Trigger task with per-device lock.
227 executor.execute(withLock(() -> {
228 final boolean success = doSetPipeconfIfRequired(device, pipeconf);
229 if (success) {
230 signalStatusReady(device.id());
231 } else {
232 signalStatusUnknown(device.id());
233 }
234 }, device.id()));
235 });
236 }
237
238 /**
239 * Returns true if the given device is known to be configured with the given
240 * pipeline, false otherwise. If necessary, this method enforces setting the
241 * given pipeconf using drivers.
242 *
243 * @param device device
244 * @param pipeconf pipeconf
245 * @return boolean
246 */
247 private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
248 log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
249 final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
250 final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
251 if (!handshaker.isConnected()) {
252 return false;
253 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800254 if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700255 log.debug("Pipeconf {} already configured on {}",
256 pipeconf.id(), device.id());
257 return true;
258 }
259 try {
260 return pipelineProg.setPipeconf(pipeconf)
261 .get(PIPECONF_SET_TIMEOUT, TimeUnit.SECONDS);
262 } catch (InterruptedException e) {
263 log.error("Thread interrupted while setting pipeconf on {}",
264 device.id());
265 Thread.currentThread().interrupt();
266 } catch (ExecutionException e) {
267 log.error("Exception while setting pipeconf on {}",
268 device.id(), e.getCause());
269 } catch (TimeoutException e) {
270 log.error("Operation TIMEOUT while setting pipeconf on {}",
271 device.id());
272 }
273 return false;
274 }
275
276 private Runnable withLock(Runnable task, Object object) {
277 return () -> {
278 final Lock lock = locks.get(object);
279 lock.lock();
280 try {
281 task.run();
282 } finally {
283 lock.unlock();
284 }
285 };
286 }
287
288 private void signalStatusUnknown(DeviceId deviceId) {
289 statusMap.remove(deviceId);
290 }
291
292 private void signalStatusReady(DeviceId deviceId) {
293 statusMap.put(deviceId, PipelineStatus.READY);
294 }
295
296 private boolean isLocalMaster(Device device) {
297 if (mastershipService.isLocalMaster(device.id())) {
298 return true;
299 }
300 // The device might have no master (e.g. after it has been disconnected
301 // from core), hence we use device mastership state.
302 final MastershipInfo info = mastershipService.getMastershipFor(device.id());
303 return !info.master().isPresent() &&
304 device.is(DeviceHandshaker.class) &&
305 device.as(DeviceHandshaker.class).getRole()
306 .equals(MastershipRole.MASTER);
307 }
308
309 private void startProbeTask() {
310 synchronized (timer) {
311 log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
312 task = new InternalTimerTask();
313 timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
314 probeInterval * SECONDS);
315 }
316 }
317
318
319 private void stopProbeTask() {
320 synchronized (timer) {
321 log.info("Stopping pipeline probe thread...");
322 task.cancel();
323 task = null;
324 }
325 }
326
327
328 private synchronized void rescheduleProbeTask() {
329 synchronized (timer) {
330 stopProbeTask();
331 startProbeTask();
332 }
333 }
334
335 private class InternalTimerTask extends TimerTask {
336 @Override
337 public void run() {
338 triggerCheckAllDevices();
339 }
340 }
341
342 /**
343 * Listener of device events used to update the pipeline status.
344 */
345 private class InternalDeviceListener implements DeviceListener {
346
347 @Override
348 public void event(DeviceEvent event) {
349 final Device device = event.subject();
350 switch (event.type()) {
351 case DEVICE_ADDED:
352 case DEVICE_UPDATED:
353 case DEVICE_AVAILABILITY_CHANGED:
354 if (!deviceService.isAvailable(device.id())) {
355 signalStatusUnknown(device.id());
356 }
357 break;
358 case DEVICE_REMOVED:
359 case DEVICE_SUSPENDED:
360 signalStatusUnknown(device.id());
361 break;
362 case PORT_ADDED:
363 case PORT_UPDATED:
364 case PORT_REMOVED:
365 case PORT_STATS_UPDATED:
366 default:
367 break;
368 }
369 }
370 }
371
372 private class StatusMapListener
373 implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
374
375 @Override
376 public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
377 final DeviceId deviceId = event.key();
378 final PipelineStatus status = event.value();
379 switch (event.type()) {
380 case PUT:
381 postStatusEvent(deviceId, status);
382 break;
383 case REMOVE:
384 postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
385 break;
386 default:
387 log.error("Unknown map event type {}", event.type());
388 }
389 }
390
391 private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
392 PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
393 oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
394 final PiPipeconfWatchdogEvent.Type eventType =
395 newStatus == PipelineStatus.READY
396 ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
397 : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
398 if (newStatus != oldStatus) {
399 log.info("Pipeline status of {} is {}", deviceId, newStatus);
400 post(new PiPipeconfWatchdogEvent(eventType, deviceId));
401 }
402 }
403 }
404}