blob: 0503d9a5aad7fbc8fb3b356e2e82f6136403648d [file] [log] [blame]
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.pi.impl;
18
19import com.google.common.collect.Maps;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080020import com.google.common.util.concurrent.Futures;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070021import com.google.common.util.concurrent.Striped;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070022import org.onlab.util.KryoNamespace;
23import org.onlab.util.Tools;
24import org.onosproject.cfg.ComponentConfigService;
25import org.onosproject.event.AbstractListenerManager;
26import org.onosproject.mastership.MastershipInfo;
27import org.onosproject.mastership.MastershipService;
28import org.onosproject.net.Device;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.MastershipRole;
31import org.onosproject.net.behaviour.PiPipelineProgrammable;
32import org.onosproject.net.device.DeviceEvent;
33import org.onosproject.net.device.DeviceHandshaker;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.pi.model.PiPipeconf;
37import org.onosproject.net.pi.model.PiPipeconfId;
38import org.onosproject.net.pi.service.PiPipeconfMappingStore;
39import org.onosproject.net.pi.service.PiPipeconfService;
40import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
41import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
42import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.EventuallyConsistentMap;
45import org.onosproject.store.service.EventuallyConsistentMapEvent;
46import org.onosproject.store.service.EventuallyConsistentMapListener;
47import org.onosproject.store.service.StorageService;
48import org.onosproject.store.service.WallClockTimestamp;
49import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070050import org.osgi.service.component.annotations.Activate;
51import org.osgi.service.component.annotations.Component;
52import org.osgi.service.component.annotations.Deactivate;
53import org.osgi.service.component.annotations.Modified;
54import org.osgi.service.component.annotations.Reference;
55import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070056import org.slf4j.Logger;
57
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070058import java.util.Dictionary;
59import java.util.Map;
60import java.util.Timer;
61import java.util.TimerTask;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070062import java.util.concurrent.ExecutorService;
63import java.util.concurrent.Executors;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070064import java.util.concurrent.locks.Lock;
65
Carmelo Cascone95308282019-03-18 17:18:04 -070066import static java.util.Collections.singleton;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070067import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd04e2272018-10-16 18:20:18 -070068import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL;
69import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070070import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Implementation of PiPipeconfWatchdogService that implements a periodic
74 * pipeline probe task and listens for device events to update the status of the
75 * pipeline.
76 */
Ray Milkeyd04e2272018-10-16 18:20:18 -070077@Component(
78 immediate = true,
79 service = PiPipeconfWatchdogService.class,
80 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070081 PWM_PROBE_INTERVAL + ":Integer=" + PWM_PROBE_INTERVAL_DEFAULT
Ray Milkeyd04e2272018-10-16 18:20:18 -070082 }
83)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070084public class PiPipeconfWatchdogManager
85 extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
86 implements PiPipeconfWatchdogService {
87
88 private final Logger log = getLogger(getClass());
89
90 private static final long SECONDS = 1000L;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070091
Ray Milkeyd84f89b2018-08-17 14:54:17 -070092 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070093 private PiPipeconfMappingStore pipeconfMappingStore;
94
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070096 private DeviceService deviceService;
97
Ray Milkeyd84f89b2018-08-17 14:54:17 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070099 private MastershipService mastershipService;
100
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700102 protected PiPipeconfService pipeconfService;
103
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700105 protected StorageService storageService;
106
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700108 private ComponentConfigService componentConfigService;
109
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700110 /** Configure interval in seconds for device pipeconf probing. */
Ray Milkeyd04e2272018-10-16 18:20:18 -0700111 private int probeInterval = PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700112
113 protected ExecutorService executor = Executors.newFixedThreadPool(
114 30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
115
116 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
117
118 private Timer timer;
119 private TimerTask task;
120
121 private final Striped<Lock> locks = Striped.lock(30);
122
123 private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
124 private Map<DeviceId, PipelineStatus> localStatusMap;
125
126 @Activate
127 public void activate() {
128 eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
129 localStatusMap = Maps.newConcurrentMap();
130 // Init distributed status map.
131 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
132 .register(KryoNamespaces.API)
133 .register(PipelineStatus.class);
134 statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
135 .withName("onos-pipeconf-status-table")
136 .withSerializer(serializer)
137 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
138 statusMap.addListener(new StatusMapListener());
139 // Register component configurable properties.
140 componentConfigService.registerProperties(getClass());
141 // Start periodic watchdog task.
142 timer = new Timer();
143 startProbeTask();
144 // Add device listener.
145 deviceService.addListener(deviceListener);
146 log.info("Started");
147 }
148
149 @Modified
150 public void modified(ComponentContext context) {
151 if (context == null) {
152 return;
153 }
154
155 Dictionary<?, ?> properties = context.getProperties();
156 final int oldProbeInterval = probeInterval;
157 probeInterval = Tools.getIntegerProperty(
Ray Milkeyd04e2272018-10-16 18:20:18 -0700158 properties, PWM_PROBE_INTERVAL, PWM_PROBE_INTERVAL_DEFAULT);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700159 log.info("Configured. {} is configured to {} seconds",
Ray Milkeyd04e2272018-10-16 18:20:18 -0700160 PWM_PROBE_INTERVAL_DEFAULT, probeInterval);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700161
162 if (oldProbeInterval != probeInterval) {
163 rescheduleProbeTask();
164 }
165 }
166
167 @Deactivate
168 public void deactivate() {
169 eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
170 deviceService.removeListener(deviceListener);
171 stopProbeTask();
172 timer = null;
173 statusMap = null;
174 localStatusMap = null;
175 log.info("Stopped");
176 }
177
178 @Override
179 public void triggerProbe(DeviceId deviceId) {
180 final Device device = deviceService.getDevice(deviceId);
181 if (device != null) {
Carmelo Cascone95308282019-03-18 17:18:04 -0700182 filterAndTriggerTasks(singleton(device));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700183 }
184 }
185
186 @Override
187 public PipelineStatus getStatus(DeviceId deviceId) {
188 final PipelineStatus status = statusMap.get(deviceId);
189 return status == null ? PipelineStatus.UNKNOWN : status;
190 }
191
192 private void triggerCheckAllDevices() {
193 filterAndTriggerTasks(deviceService.getDevices());
194 }
195
196 private void filterAndTriggerTasks(Iterable<Device> devices) {
197 devices.forEach(device -> {
198 if (!isLocalMaster(device)) {
199 return;
200 }
201
202 final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
Carmelo Cascone95308282019-03-18 17:18:04 -0700203 if (pipeconfId == null || !device.is(PiPipelineProgrammable.class)) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700204 return;
205 }
206
207 if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
208 log.error("Pipeconf {} is not registered", pipeconfId);
209 return;
210 }
211
212 final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
213
214 if (!device.is(DeviceHandshaker.class)) {
215 log.error("Missing DeviceHandshaker behavior for {}", device.id());
216 return;
217 }
218
219 // Trigger task with per-device lock.
220 executor.execute(withLock(() -> {
221 final boolean success = doSetPipeconfIfRequired(device, pipeconf);
222 if (success) {
223 signalStatusReady(device.id());
224 } else {
225 signalStatusUnknown(device.id());
226 }
227 }, device.id()));
228 });
229 }
230
231 /**
232 * Returns true if the given device is known to be configured with the given
233 * pipeline, false otherwise. If necessary, this method enforces setting the
234 * given pipeconf using drivers.
235 *
236 * @param device device
237 * @param pipeconf pipeconf
238 * @return boolean
239 */
240 private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
241 log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
242 final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
243 final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
244 if (!handshaker.isConnected()) {
245 return false;
246 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800247 if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700248 log.debug("Pipeconf {} already configured on {}",
249 pipeconf.id(), device.id());
250 return true;
251 }
Carmelo Cascone95308282019-03-18 17:18:04 -0700252 return Futures.getUnchecked(pipelineProg.setPipeconf(pipeconf));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700253 }
254
255 private Runnable withLock(Runnable task, Object object) {
256 return () -> {
257 final Lock lock = locks.get(object);
258 lock.lock();
259 try {
260 task.run();
261 } finally {
262 lock.unlock();
263 }
264 };
265 }
266
267 private void signalStatusUnknown(DeviceId deviceId) {
268 statusMap.remove(deviceId);
269 }
270
271 private void signalStatusReady(DeviceId deviceId) {
272 statusMap.put(deviceId, PipelineStatus.READY);
273 }
274
275 private boolean isLocalMaster(Device device) {
276 if (mastershipService.isLocalMaster(device.id())) {
277 return true;
278 }
279 // The device might have no master (e.g. after it has been disconnected
280 // from core), hence we use device mastership state.
281 final MastershipInfo info = mastershipService.getMastershipFor(device.id());
282 return !info.master().isPresent() &&
283 device.is(DeviceHandshaker.class) &&
284 device.as(DeviceHandshaker.class).getRole()
285 .equals(MastershipRole.MASTER);
286 }
287
288 private void startProbeTask() {
Carmelo Cascone95308282019-03-18 17:18:04 -0700289 synchronized (this) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700290 log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
291 task = new InternalTimerTask();
292 timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
293 probeInterval * SECONDS);
294 }
295 }
296
297
298 private void stopProbeTask() {
Carmelo Cascone95308282019-03-18 17:18:04 -0700299 synchronized (this) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700300 log.info("Stopping pipeline probe thread...");
301 task.cancel();
302 task = null;
303 }
304 }
305
306
307 private synchronized void rescheduleProbeTask() {
Carmelo Cascone95308282019-03-18 17:18:04 -0700308 synchronized (this) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700309 stopProbeTask();
310 startProbeTask();
311 }
312 }
313
314 private class InternalTimerTask extends TimerTask {
315 @Override
316 public void run() {
317 triggerCheckAllDevices();
318 }
319 }
320
321 /**
322 * Listener of device events used to update the pipeline status.
323 */
324 private class InternalDeviceListener implements DeviceListener {
325
326 @Override
327 public void event(DeviceEvent event) {
328 final Device device = event.subject();
329 switch (event.type()) {
330 case DEVICE_ADDED:
331 case DEVICE_UPDATED:
332 case DEVICE_AVAILABILITY_CHANGED:
333 if (!deviceService.isAvailable(device.id())) {
334 signalStatusUnknown(device.id());
Carmelo Cascone95308282019-03-18 17:18:04 -0700335 } else {
336 // The GeneralDeviceProvider marks online devices that
337 // have ANY pipeline config set. Here we make sure the
338 // one configured in the pipeconf service is the
339 // expected one. Clearly, it would be better to let the
340 // GDP do this check and avoid sending twice the same
341 // message to the switch.
342 filterAndTriggerTasks(singleton(device));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700343 }
344 break;
345 case DEVICE_REMOVED:
346 case DEVICE_SUSPENDED:
347 signalStatusUnknown(device.id());
348 break;
349 case PORT_ADDED:
350 case PORT_UPDATED:
351 case PORT_REMOVED:
352 case PORT_STATS_UPDATED:
353 default:
354 break;
355 }
356 }
357 }
358
359 private class StatusMapListener
360 implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
361
362 @Override
363 public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
364 final DeviceId deviceId = event.key();
365 final PipelineStatus status = event.value();
366 switch (event.type()) {
367 case PUT:
368 postStatusEvent(deviceId, status);
369 break;
370 case REMOVE:
371 postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
372 break;
373 default:
374 log.error("Unknown map event type {}", event.type());
375 }
376 }
377
378 private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
379 PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
380 oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
381 final PiPipeconfWatchdogEvent.Type eventType =
382 newStatus == PipelineStatus.READY
383 ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
384 : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
385 if (newStatus != oldStatus) {
386 log.info("Pipeline status of {} is {}", deviceId, newStatus);
387 post(new PiPipeconfWatchdogEvent(eventType, deviceId));
388 }
389 }
390 }
391}