blob: 62fb157268a176622c3f471add025330c49ce68a [file] [log] [blame]
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.pi.impl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070021import org.onlab.util.KryoNamespace;
22import org.onlab.util.Tools;
23import org.onosproject.cfg.ComponentConfigService;
24import org.onosproject.event.AbstractListenerManager;
25import org.onosproject.mastership.MastershipInfo;
26import org.onosproject.mastership.MastershipService;
27import org.onosproject.net.Device;
28import org.onosproject.net.DeviceId;
29import org.onosproject.net.MastershipRole;
30import org.onosproject.net.behaviour.PiPipelineProgrammable;
31import org.onosproject.net.device.DeviceEvent;
32import org.onosproject.net.device.DeviceHandshaker;
33import org.onosproject.net.device.DeviceListener;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.pi.model.PiPipeconf;
36import org.onosproject.net.pi.model.PiPipeconfId;
37import org.onosproject.net.pi.service.PiPipeconfMappingStore;
38import org.onosproject.net.pi.service.PiPipeconfService;
39import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
40import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
41import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.service.EventuallyConsistentMap;
44import org.onosproject.store.service.EventuallyConsistentMapEvent;
45import org.onosproject.store.service.EventuallyConsistentMapListener;
46import org.onosproject.store.service.StorageService;
47import org.onosproject.store.service.WallClockTimestamp;
48import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070049import org.osgi.service.component.annotations.Activate;
50import org.osgi.service.component.annotations.Component;
51import org.osgi.service.component.annotations.Deactivate;
52import org.osgi.service.component.annotations.Modified;
53import org.osgi.service.component.annotations.Reference;
54import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070055import org.slf4j.Logger;
56
57import java.util.Collections;
58import java.util.Dictionary;
59import java.util.Map;
60import java.util.Timer;
61import java.util.TimerTask;
62import java.util.concurrent.ExecutionException;
63import java.util.concurrent.ExecutorService;
64import java.util.concurrent.Executors;
65import java.util.concurrent.TimeUnit;
66import java.util.concurrent.TimeoutException;
67import java.util.concurrent.locks.Lock;
68
69import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd04e2272018-10-16 18:20:18 -070070import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL;
71import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070072import static org.slf4j.LoggerFactory.getLogger;
73
74/**
75 * Implementation of PiPipeconfWatchdogService that implements a periodic
76 * pipeline probe task and listens for device events to update the status of the
77 * pipeline.
78 */
Ray Milkeyd04e2272018-10-16 18:20:18 -070079@Component(
80 immediate = true,
81 service = PiPipeconfWatchdogService.class,
82 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070083 PWM_PROBE_INTERVAL + ":Integer=" + PWM_PROBE_INTERVAL_DEFAULT
Ray Milkeyd04e2272018-10-16 18:20:18 -070084 }
85)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070086public class PiPipeconfWatchdogManager
87 extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
88 implements PiPipeconfWatchdogService {
89
90 private final Logger log = getLogger(getClass());
91
92 private static final long SECONDS = 1000L;
93 // Long enough to allow for network delay (e.g. to transfer large pipeline
94 // binaries over slow network).
95 private static final long PIPECONF_SET_TIMEOUT = 60; // Seconds.
96
Ray Milkeyd84f89b2018-08-17 14:54:17 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070098 private PiPipeconfMappingStore pipeconfMappingStore;
99
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700101 private DeviceService deviceService;
102
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700104 private MastershipService mastershipService;
105
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700107 protected PiPipeconfService pipeconfService;
108
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700110 protected StorageService storageService;
111
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700113 private ComponentConfigService componentConfigService;
114
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700115 /** Configure interval in seconds for device pipeconf probing. */
Ray Milkeyd04e2272018-10-16 18:20:18 -0700116 private int probeInterval = PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700117
118 protected ExecutorService executor = Executors.newFixedThreadPool(
119 30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
120
121 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
122
123 private Timer timer;
124 private TimerTask task;
125
126 private final Striped<Lock> locks = Striped.lock(30);
127
128 private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
129 private Map<DeviceId, PipelineStatus> localStatusMap;
130
131 @Activate
132 public void activate() {
133 eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
134 localStatusMap = Maps.newConcurrentMap();
135 // Init distributed status map.
136 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
137 .register(KryoNamespaces.API)
138 .register(PipelineStatus.class);
139 statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
140 .withName("onos-pipeconf-status-table")
141 .withSerializer(serializer)
142 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
143 statusMap.addListener(new StatusMapListener());
144 // Register component configurable properties.
145 componentConfigService.registerProperties(getClass());
146 // Start periodic watchdog task.
147 timer = new Timer();
148 startProbeTask();
149 // Add device listener.
150 deviceService.addListener(deviceListener);
151 log.info("Started");
152 }
153
154 @Modified
155 public void modified(ComponentContext context) {
156 if (context == null) {
157 return;
158 }
159
160 Dictionary<?, ?> properties = context.getProperties();
161 final int oldProbeInterval = probeInterval;
162 probeInterval = Tools.getIntegerProperty(
Ray Milkeyd04e2272018-10-16 18:20:18 -0700163 properties, PWM_PROBE_INTERVAL, PWM_PROBE_INTERVAL_DEFAULT);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700164 log.info("Configured. {} is configured to {} seconds",
Ray Milkeyd04e2272018-10-16 18:20:18 -0700165 PWM_PROBE_INTERVAL_DEFAULT, probeInterval);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700166
167 if (oldProbeInterval != probeInterval) {
168 rescheduleProbeTask();
169 }
170 }
171
172 @Deactivate
173 public void deactivate() {
174 eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
175 deviceService.removeListener(deviceListener);
176 stopProbeTask();
177 timer = null;
178 statusMap = null;
179 localStatusMap = null;
180 log.info("Stopped");
181 }
182
183 @Override
184 public void triggerProbe(DeviceId deviceId) {
185 final Device device = deviceService.getDevice(deviceId);
186 if (device != null) {
187 filterAndTriggerTasks(Collections.singleton(device));
188 }
189 }
190
191 @Override
192 public PipelineStatus getStatus(DeviceId deviceId) {
193 final PipelineStatus status = statusMap.get(deviceId);
194 return status == null ? PipelineStatus.UNKNOWN : status;
195 }
196
197 private void triggerCheckAllDevices() {
198 filterAndTriggerTasks(deviceService.getDevices());
199 }
200
201 private void filterAndTriggerTasks(Iterable<Device> devices) {
202 devices.forEach(device -> {
203 if (!isLocalMaster(device)) {
204 return;
205 }
206
207 final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
208 if (pipeconfId == null
209 || !device.is(PiPipelineProgrammable.class)) {
210 return;
211 }
212
213 if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
214 log.error("Pipeconf {} is not registered", pipeconfId);
215 return;
216 }
217
218 final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
219
220 if (!device.is(DeviceHandshaker.class)) {
221 log.error("Missing DeviceHandshaker behavior for {}", device.id());
222 return;
223 }
224
225 // Trigger task with per-device lock.
226 executor.execute(withLock(() -> {
227 final boolean success = doSetPipeconfIfRequired(device, pipeconf);
228 if (success) {
229 signalStatusReady(device.id());
230 } else {
231 signalStatusUnknown(device.id());
232 }
233 }, device.id()));
234 });
235 }
236
237 /**
238 * Returns true if the given device is known to be configured with the given
239 * pipeline, false otherwise. If necessary, this method enforces setting the
240 * given pipeconf using drivers.
241 *
242 * @param device device
243 * @param pipeconf pipeconf
244 * @return boolean
245 */
246 private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
247 log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
248 final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
249 final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
250 if (!handshaker.isConnected()) {
251 return false;
252 }
253 if (pipelineProg.isPipeconfSet(pipeconf)) {
254 log.debug("Pipeconf {} already configured on {}",
255 pipeconf.id(), device.id());
256 return true;
257 }
258 try {
259 return pipelineProg.setPipeconf(pipeconf)
260 .get(PIPECONF_SET_TIMEOUT, TimeUnit.SECONDS);
261 } catch (InterruptedException e) {
262 log.error("Thread interrupted while setting pipeconf on {}",
263 device.id());
264 Thread.currentThread().interrupt();
265 } catch (ExecutionException e) {
266 log.error("Exception while setting pipeconf on {}",
267 device.id(), e.getCause());
268 } catch (TimeoutException e) {
269 log.error("Operation TIMEOUT while setting pipeconf on {}",
270 device.id());
271 }
272 return false;
273 }
274
275 private Runnable withLock(Runnable task, Object object) {
276 return () -> {
277 final Lock lock = locks.get(object);
278 lock.lock();
279 try {
280 task.run();
281 } finally {
282 lock.unlock();
283 }
284 };
285 }
286
287 private void signalStatusUnknown(DeviceId deviceId) {
288 statusMap.remove(deviceId);
289 }
290
291 private void signalStatusReady(DeviceId deviceId) {
292 statusMap.put(deviceId, PipelineStatus.READY);
293 }
294
295 private boolean isLocalMaster(Device device) {
296 if (mastershipService.isLocalMaster(device.id())) {
297 return true;
298 }
299 // The device might have no master (e.g. after it has been disconnected
300 // from core), hence we use device mastership state.
301 final MastershipInfo info = mastershipService.getMastershipFor(device.id());
302 return !info.master().isPresent() &&
303 device.is(DeviceHandshaker.class) &&
304 device.as(DeviceHandshaker.class).getRole()
305 .equals(MastershipRole.MASTER);
306 }
307
308 private void startProbeTask() {
309 synchronized (timer) {
310 log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
311 task = new InternalTimerTask();
312 timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
313 probeInterval * SECONDS);
314 }
315 }
316
317
318 private void stopProbeTask() {
319 synchronized (timer) {
320 log.info("Stopping pipeline probe thread...");
321 task.cancel();
322 task = null;
323 }
324 }
325
326
327 private synchronized void rescheduleProbeTask() {
328 synchronized (timer) {
329 stopProbeTask();
330 startProbeTask();
331 }
332 }
333
334 private class InternalTimerTask extends TimerTask {
335 @Override
336 public void run() {
337 triggerCheckAllDevices();
338 }
339 }
340
341 /**
342 * Listener of device events used to update the pipeline status.
343 */
344 private class InternalDeviceListener implements DeviceListener {
345
346 @Override
347 public void event(DeviceEvent event) {
348 final Device device = event.subject();
349 switch (event.type()) {
350 case DEVICE_ADDED:
351 case DEVICE_UPDATED:
352 case DEVICE_AVAILABILITY_CHANGED:
353 if (!deviceService.isAvailable(device.id())) {
354 signalStatusUnknown(device.id());
355 }
356 break;
357 case DEVICE_REMOVED:
358 case DEVICE_SUSPENDED:
359 signalStatusUnknown(device.id());
360 break;
361 case PORT_ADDED:
362 case PORT_UPDATED:
363 case PORT_REMOVED:
364 case PORT_STATS_UPDATED:
365 default:
366 break;
367 }
368 }
369 }
370
371 private class StatusMapListener
372 implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
373
374 @Override
375 public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
376 final DeviceId deviceId = event.key();
377 final PipelineStatus status = event.value();
378 switch (event.type()) {
379 case PUT:
380 postStatusEvent(deviceId, status);
381 break;
382 case REMOVE:
383 postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
384 break;
385 default:
386 log.error("Unknown map event type {}", event.type());
387 }
388 }
389
390 private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
391 PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
392 oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
393 final PiPipeconfWatchdogEvent.Type eventType =
394 newStatus == PipelineStatus.READY
395 ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
396 : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
397 if (newStatus != oldStatus) {
398 log.info("Pipeline status of {} is {}", deviceId, newStatus);
399 post(new PiPipeconfWatchdogEvent(eventType, deviceId));
400 }
401 }
402 }
403}