blob: 21df7781d8445593839b7098b991b2d96bd9d9a5 [file] [log] [blame]
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.pi.impl;
18
19import com.google.common.collect.Maps;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080020import com.google.common.util.concurrent.Futures;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070021import com.google.common.util.concurrent.Striped;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070022import org.onlab.util.KryoNamespace;
23import org.onlab.util.Tools;
24import org.onosproject.cfg.ComponentConfigService;
25import org.onosproject.event.AbstractListenerManager;
26import org.onosproject.mastership.MastershipInfo;
27import org.onosproject.mastership.MastershipService;
28import org.onosproject.net.Device;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.MastershipRole;
31import org.onosproject.net.behaviour.PiPipelineProgrammable;
32import org.onosproject.net.device.DeviceEvent;
33import org.onosproject.net.device.DeviceHandshaker;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.pi.model.PiPipeconf;
37import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone75a9a892019-04-22 12:12:23 -070038import org.onosproject.net.pi.service.PiPipeconfEvent;
39import org.onosproject.net.pi.service.PiPipeconfListener;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070040import org.onosproject.net.pi.service.PiPipeconfMappingStore;
41import org.onosproject.net.pi.service.PiPipeconfService;
42import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
43import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
44import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
45import org.onosproject.store.serializers.KryoNamespaces;
46import org.onosproject.store.service.EventuallyConsistentMap;
47import org.onosproject.store.service.EventuallyConsistentMapEvent;
48import org.onosproject.store.service.EventuallyConsistentMapListener;
49import org.onosproject.store.service.StorageService;
50import org.onosproject.store.service.WallClockTimestamp;
51import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070052import org.osgi.service.component.annotations.Activate;
53import org.osgi.service.component.annotations.Component;
54import org.osgi.service.component.annotations.Deactivate;
55import org.osgi.service.component.annotations.Modified;
56import org.osgi.service.component.annotations.Reference;
57import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070058import org.slf4j.Logger;
59
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070060import java.util.Dictionary;
61import java.util.Map;
Carmelo Cascone75a9a892019-04-22 12:12:23 -070062import java.util.Objects;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070063import java.util.Timer;
64import java.util.TimerTask;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070065import java.util.concurrent.ExecutorService;
66import java.util.concurrent.Executors;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070067import java.util.concurrent.locks.Lock;
68
Carmelo Cascone95308282019-03-18 17:18:04 -070069import static java.util.Collections.singleton;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070070import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyd04e2272018-10-16 18:20:18 -070071import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL;
72import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070073import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * Implementation of PiPipeconfWatchdogService that implements a periodic
77 * pipeline probe task and listens for device events to update the status of the
78 * pipeline.
79 */
Ray Milkeyd04e2272018-10-16 18:20:18 -070080@Component(
Carmelo Cascone75a9a892019-04-22 12:12:23 -070081 immediate = true,
82 service = PiPipeconfWatchdogService.class,
83 property = {
84 PWM_PROBE_INTERVAL + ":Integer=" + PWM_PROBE_INTERVAL_DEFAULT
85 }
Ray Milkeyd04e2272018-10-16 18:20:18 -070086)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070087public class PiPipeconfWatchdogManager
88 extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
89 implements PiPipeconfWatchdogService {
90
91 private final Logger log = getLogger(getClass());
92
93 private static final long SECONDS = 1000L;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070094
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070096 private PiPipeconfMappingStore pipeconfMappingStore;
97
Ray Milkeyd84f89b2018-08-17 14:54:17 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070099 private DeviceService deviceService;
100
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700102 private MastershipService mastershipService;
103
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700105 protected PiPipeconfService pipeconfService;
106
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700108 protected StorageService storageService;
109
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700111 private ComponentConfigService componentConfigService;
112
Carmelo Cascone75a9a892019-04-22 12:12:23 -0700113 /**
114 * Configure interval in seconds for device pipeconf probing.
115 */
Ray Milkeyd04e2272018-10-16 18:20:18 -0700116 private int probeInterval = PWM_PROBE_INTERVAL_DEFAULT;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700117
118 protected ExecutorService executor = Executors.newFixedThreadPool(
119 30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
120
Carmelo Cascone75a9a892019-04-22 12:12:23 -0700121 private final DeviceListener deviceListener = new InternalDeviceListener();
122 private final PiPipeconfListener pipeconfListener = new InternalPipeconfListener();
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700123
124 private Timer timer;
125 private TimerTask task;
126
127 private final Striped<Lock> locks = Striped.lock(30);
128
129 private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
130 private Map<DeviceId, PipelineStatus> localStatusMap;
131
132 @Activate
133 public void activate() {
134 eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
135 localStatusMap = Maps.newConcurrentMap();
136 // Init distributed status map.
137 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
138 .register(KryoNamespaces.API)
139 .register(PipelineStatus.class);
140 statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
141 .withName("onos-pipeconf-status-table")
142 .withSerializer(serializer)
143 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
144 statusMap.addListener(new StatusMapListener());
145 // Register component configurable properties.
146 componentConfigService.registerProperties(getClass());
147 // Start periodic watchdog task.
148 timer = new Timer();
149 startProbeTask();
Carmelo Cascone75a9a892019-04-22 12:12:23 -0700150 // Add listeners.
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700151 deviceService.addListener(deviceListener);
Carmelo Cascone75a9a892019-04-22 12:12:23 -0700152 pipeconfService.addListener(pipeconfListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700153 log.info("Started");
154 }
155
156 @Modified
157 public void modified(ComponentContext context) {
158 if (context == null) {
159 return;
160 }
161
162 Dictionary<?, ?> properties = context.getProperties();
163 final int oldProbeInterval = probeInterval;
164 probeInterval = Tools.getIntegerProperty(
Ray Milkeyd04e2272018-10-16 18:20:18 -0700165 properties, PWM_PROBE_INTERVAL, PWM_PROBE_INTERVAL_DEFAULT);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700166 log.info("Configured. {} is configured to {} seconds",
Ray Milkeyd04e2272018-10-16 18:20:18 -0700167 PWM_PROBE_INTERVAL_DEFAULT, probeInterval);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700168
169 if (oldProbeInterval != probeInterval) {
170 rescheduleProbeTask();
171 }
172 }
173
174 @Deactivate
175 public void deactivate() {
176 eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
Carmelo Cascone75a9a892019-04-22 12:12:23 -0700177 pipeconfService.removeListener(pipeconfListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700178 deviceService.removeListener(deviceListener);
179 stopProbeTask();
180 timer = null;
181 statusMap = null;
182 localStatusMap = null;
183 log.info("Stopped");
184 }
185
186 @Override
187 public void triggerProbe(DeviceId deviceId) {
188 final Device device = deviceService.getDevice(deviceId);
189 if (device != null) {
Carmelo Cascone95308282019-03-18 17:18:04 -0700190 filterAndTriggerTasks(singleton(device));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700191 }
192 }
193
194 @Override
195 public PipelineStatus getStatus(DeviceId deviceId) {
196 final PipelineStatus status = statusMap.get(deviceId);
197 return status == null ? PipelineStatus.UNKNOWN : status;
198 }
199
200 private void triggerCheckAllDevices() {
201 filterAndTriggerTasks(deviceService.getDevices());
202 }
203
204 private void filterAndTriggerTasks(Iterable<Device> devices) {
205 devices.forEach(device -> {
206 if (!isLocalMaster(device)) {
207 return;
208 }
209
210 final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
Carmelo Cascone95308282019-03-18 17:18:04 -0700211 if (pipeconfId == null || !device.is(PiPipelineProgrammable.class)) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700212 return;
213 }
214
215 if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
Carmelo Cascone75a9a892019-04-22 12:12:23 -0700216 log.warn("Pipeconf {} is not registered, skipping probe for {}",
217 pipeconfId, device.id());
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700218 return;
219 }
220
221 final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
222
223 if (!device.is(DeviceHandshaker.class)) {
224 log.error("Missing DeviceHandshaker behavior for {}", device.id());
225 return;
226 }
227
228 // Trigger task with per-device lock.
229 executor.execute(withLock(() -> {
230 final boolean success = doSetPipeconfIfRequired(device, pipeconf);
231 if (success) {
232 signalStatusReady(device.id());
233 } else {
234 signalStatusUnknown(device.id());
235 }
236 }, device.id()));
237 });
238 }
239
240 /**
241 * Returns true if the given device is known to be configured with the given
242 * pipeline, false otherwise. If necessary, this method enforces setting the
243 * given pipeconf using drivers.
244 *
245 * @param device device
246 * @param pipeconf pipeconf
247 * @return boolean
248 */
249 private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
250 log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
251 final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
252 final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700253 if (!handshaker.hasConnection()) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700254 return false;
255 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800256 if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700257 log.debug("Pipeconf {} already configured on {}",
258 pipeconf.id(), device.id());
259 return true;
260 }
Carmelo Cascone95308282019-03-18 17:18:04 -0700261 return Futures.getUnchecked(pipelineProg.setPipeconf(pipeconf));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700262 }
263
264 private Runnable withLock(Runnable task, Object object) {
265 return () -> {
266 final Lock lock = locks.get(object);
267 lock.lock();
268 try {
269 task.run();
270 } finally {
271 lock.unlock();
272 }
273 };
274 }
275
276 private void signalStatusUnknown(DeviceId deviceId) {
277 statusMap.remove(deviceId);
278 }
279
280 private void signalStatusReady(DeviceId deviceId) {
281 statusMap.put(deviceId, PipelineStatus.READY);
282 }
283
284 private boolean isLocalMaster(Device device) {
285 if (mastershipService.isLocalMaster(device.id())) {
286 return true;
287 }
288 // The device might have no master (e.g. after it has been disconnected
289 // from core), hence we use device mastership state.
290 final MastershipInfo info = mastershipService.getMastershipFor(device.id());
291 return !info.master().isPresent() &&
292 device.is(DeviceHandshaker.class) &&
293 device.as(DeviceHandshaker.class).getRole()
294 .equals(MastershipRole.MASTER);
295 }
296
297 private void startProbeTask() {
Carmelo Cascone95308282019-03-18 17:18:04 -0700298 synchronized (this) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700299 log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
300 task = new InternalTimerTask();
301 timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
302 probeInterval * SECONDS);
303 }
304 }
305
306
307 private void stopProbeTask() {
Carmelo Cascone95308282019-03-18 17:18:04 -0700308 synchronized (this) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700309 log.info("Stopping pipeline probe thread...");
310 task.cancel();
311 task = null;
312 }
313 }
314
315
316 private synchronized void rescheduleProbeTask() {
Carmelo Cascone95308282019-03-18 17:18:04 -0700317 synchronized (this) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700318 stopProbeTask();
319 startProbeTask();
320 }
321 }
322
323 private class InternalTimerTask extends TimerTask {
324 @Override
325 public void run() {
326 triggerCheckAllDevices();
327 }
328 }
329
330 /**
331 * Listener of device events used to update the pipeline status.
332 */
333 private class InternalDeviceListener implements DeviceListener {
334
335 @Override
336 public void event(DeviceEvent event) {
337 final Device device = event.subject();
338 switch (event.type()) {
339 case DEVICE_ADDED:
340 case DEVICE_UPDATED:
341 case DEVICE_AVAILABILITY_CHANGED:
342 if (!deviceService.isAvailable(device.id())) {
343 signalStatusUnknown(device.id());
Carmelo Cascone95308282019-03-18 17:18:04 -0700344 } else {
345 // The GeneralDeviceProvider marks online devices that
346 // have ANY pipeline config set. Here we make sure the
347 // one configured in the pipeconf service is the
348 // expected one. Clearly, it would be better to let the
349 // GDP do this check and avoid sending twice the same
350 // message to the switch.
351 filterAndTriggerTasks(singleton(device));
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700352 }
353 break;
354 case DEVICE_REMOVED:
355 case DEVICE_SUSPENDED:
356 signalStatusUnknown(device.id());
357 break;
358 case PORT_ADDED:
359 case PORT_UPDATED:
360 case PORT_REMOVED:
361 case PORT_STATS_UPDATED:
362 default:
363 break;
364 }
365 }
366 }
367
Carmelo Cascone75a9a892019-04-22 12:12:23 -0700368 private class InternalPipeconfListener implements PiPipeconfListener {
369 @Override
370 public void event(PiPipeconfEvent event) {
371 pipeconfMappingStore.getDevices(event.subject())
372 .forEach(PiPipeconfWatchdogManager.this::triggerProbe);
373 }
374
375 @Override
376 public boolean isRelevant(PiPipeconfEvent event) {
377 return Objects.equals(event.type(), PiPipeconfEvent.Type.REGISTERED);
378 }
379 }
380
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700381 private class StatusMapListener
382 implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
383
384 @Override
385 public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
386 final DeviceId deviceId = event.key();
387 final PipelineStatus status = event.value();
388 switch (event.type()) {
389 case PUT:
390 postStatusEvent(deviceId, status);
391 break;
392 case REMOVE:
393 postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
394 break;
395 default:
396 log.error("Unknown map event type {}", event.type());
397 }
398 }
399
400 private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
401 PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
402 oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
403 final PiPipeconfWatchdogEvent.Type eventType =
404 newStatus == PipelineStatus.READY
405 ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
406 : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
407 if (newStatus != oldStatus) {
408 log.info("Pipeline status of {} is {}", deviceId, newStatus);
409 post(new PiPipeconfWatchdogEvent(eventType, deviceId));
410 }
411 }
412 }
413}