blob: 0b1daf12b366b1f69b320320411c2131aa9549b5 [file] [log] [blame]
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.pi.impl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070021import org.onlab.util.KryoNamespace;
22import org.onlab.util.Tools;
23import org.onosproject.cfg.ComponentConfigService;
24import org.onosproject.event.AbstractListenerManager;
25import org.onosproject.mastership.MastershipInfo;
26import org.onosproject.mastership.MastershipService;
27import org.onosproject.net.Device;
28import org.onosproject.net.DeviceId;
29import org.onosproject.net.MastershipRole;
30import org.onosproject.net.behaviour.PiPipelineProgrammable;
31import org.onosproject.net.device.DeviceEvent;
32import org.onosproject.net.device.DeviceHandshaker;
33import org.onosproject.net.device.DeviceListener;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.pi.model.PiPipeconf;
36import org.onosproject.net.pi.model.PiPipeconfId;
37import org.onosproject.net.pi.service.PiPipeconfMappingStore;
38import org.onosproject.net.pi.service.PiPipeconfService;
39import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
40import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
41import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
42import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.service.EventuallyConsistentMap;
44import org.onosproject.store.service.EventuallyConsistentMapEvent;
45import org.onosproject.store.service.EventuallyConsistentMapListener;
46import org.onosproject.store.service.StorageService;
47import org.onosproject.store.service.WallClockTimestamp;
48import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070049import org.osgi.service.component.annotations.Activate;
50import org.osgi.service.component.annotations.Component;
51import org.osgi.service.component.annotations.Deactivate;
52import org.osgi.service.component.annotations.Modified;
53import org.osgi.service.component.annotations.Reference;
54import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070055import org.slf4j.Logger;
56
57import java.util.Collections;
58import java.util.Dictionary;
59import java.util.Map;
60import java.util.Timer;
61import java.util.TimerTask;
62import java.util.concurrent.ExecutionException;
63import java.util.concurrent.ExecutorService;
64import java.util.concurrent.Executors;
65import java.util.concurrent.TimeUnit;
66import java.util.concurrent.TimeoutException;
67import java.util.concurrent.locks.Lock;
68
69import static org.onlab.util.Tools.groupedThreads;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Implementation of PiPipeconfWatchdogService that implements a periodic
74 * pipeline probe task and listens for device events to update the status of the
75 * pipeline.
76 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070077@Component(immediate = true, service = PiPipeconfWatchdogService.class)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070078public class PiPipeconfWatchdogManager
79 extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
80 implements PiPipeconfWatchdogService {
81
82 private final Logger log = getLogger(getClass());
83
84 private static final long SECONDS = 1000L;
85 // Long enough to allow for network delay (e.g. to transfer large pipeline
86 // binaries over slow network).
87 private static final long PIPECONF_SET_TIMEOUT = 60; // Seconds.
88
Ray Milkeyd84f89b2018-08-17 14:54:17 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070090 private PiPipeconfMappingStore pipeconfMappingStore;
91
Ray Milkeyd84f89b2018-08-17 14:54:17 -070092 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070093 private DeviceService deviceService;
94
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070096 private MastershipService mastershipService;
97
Ray Milkeyd84f89b2018-08-17 14:54:17 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070099 protected PiPipeconfService pipeconfService;
100
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700102 protected StorageService storageService;
103
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700105 private ComponentConfigService componentConfigService;
106
107 private static final String PROBE_INTERVAL = "probeInterval";
Carmelo Casconede3b6842018-09-05 17:45:10 -0700108 private static final int DEFAULT_PROBE_INTERVAL = 15;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700109 //@Property(name = PROBE_INTERVAL, intValue = DEFAULT_PROBE_INTERVAL,
110 // label = "Configure interval in seconds for device pipeconf probing")
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700111 private int probeInterval = DEFAULT_PROBE_INTERVAL;
112
113 protected ExecutorService executor = Executors.newFixedThreadPool(
114 30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
115
116 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
117
118 private Timer timer;
119 private TimerTask task;
120
121 private final Striped<Lock> locks = Striped.lock(30);
122
123 private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
124 private Map<DeviceId, PipelineStatus> localStatusMap;
125
126 @Activate
127 public void activate() {
128 eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
129 localStatusMap = Maps.newConcurrentMap();
130 // Init distributed status map.
131 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
132 .register(KryoNamespaces.API)
133 .register(PipelineStatus.class);
134 statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
135 .withName("onos-pipeconf-status-table")
136 .withSerializer(serializer)
137 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
138 statusMap.addListener(new StatusMapListener());
139 // Register component configurable properties.
140 componentConfigService.registerProperties(getClass());
141 // Start periodic watchdog task.
142 timer = new Timer();
143 startProbeTask();
144 // Add device listener.
145 deviceService.addListener(deviceListener);
146 log.info("Started");
147 }
148
149 @Modified
150 public void modified(ComponentContext context) {
151 if (context == null) {
152 return;
153 }
154
155 Dictionary<?, ?> properties = context.getProperties();
156 final int oldProbeInterval = probeInterval;
157 probeInterval = Tools.getIntegerProperty(
158 properties, PROBE_INTERVAL, DEFAULT_PROBE_INTERVAL);
159 log.info("Configured. {} is configured to {} seconds",
160 PROBE_INTERVAL, probeInterval);
161
162 if (oldProbeInterval != probeInterval) {
163 rescheduleProbeTask();
164 }
165 }
166
167 @Deactivate
168 public void deactivate() {
169 eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
170 deviceService.removeListener(deviceListener);
171 stopProbeTask();
172 timer = null;
173 statusMap = null;
174 localStatusMap = null;
175 log.info("Stopped");
176 }
177
178 @Override
179 public void triggerProbe(DeviceId deviceId) {
180 final Device device = deviceService.getDevice(deviceId);
181 if (device != null) {
182 filterAndTriggerTasks(Collections.singleton(device));
183 }
184 }
185
186 @Override
187 public PipelineStatus getStatus(DeviceId deviceId) {
188 final PipelineStatus status = statusMap.get(deviceId);
189 return status == null ? PipelineStatus.UNKNOWN : status;
190 }
191
192 private void triggerCheckAllDevices() {
193 filterAndTriggerTasks(deviceService.getDevices());
194 }
195
196 private void filterAndTriggerTasks(Iterable<Device> devices) {
197 devices.forEach(device -> {
198 if (!isLocalMaster(device)) {
199 return;
200 }
201
202 final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
203 if (pipeconfId == null
204 || !device.is(PiPipelineProgrammable.class)) {
205 return;
206 }
207
208 if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
209 log.error("Pipeconf {} is not registered", pipeconfId);
210 return;
211 }
212
213 final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
214
215 if (!device.is(DeviceHandshaker.class)) {
216 log.error("Missing DeviceHandshaker behavior for {}", device.id());
217 return;
218 }
219
220 // Trigger task with per-device lock.
221 executor.execute(withLock(() -> {
222 final boolean success = doSetPipeconfIfRequired(device, pipeconf);
223 if (success) {
224 signalStatusReady(device.id());
225 } else {
226 signalStatusUnknown(device.id());
227 }
228 }, device.id()));
229 });
230 }
231
232 /**
233 * Returns true if the given device is known to be configured with the given
234 * pipeline, false otherwise. If necessary, this method enforces setting the
235 * given pipeconf using drivers.
236 *
237 * @param device device
238 * @param pipeconf pipeconf
239 * @return boolean
240 */
241 private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
242 log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
243 final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
244 final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
245 if (!handshaker.isConnected()) {
246 return false;
247 }
248 if (pipelineProg.isPipeconfSet(pipeconf)) {
249 log.debug("Pipeconf {} already configured on {}",
250 pipeconf.id(), device.id());
251 return true;
252 }
253 try {
254 return pipelineProg.setPipeconf(pipeconf)
255 .get(PIPECONF_SET_TIMEOUT, TimeUnit.SECONDS);
256 } catch (InterruptedException e) {
257 log.error("Thread interrupted while setting pipeconf on {}",
258 device.id());
259 Thread.currentThread().interrupt();
260 } catch (ExecutionException e) {
261 log.error("Exception while setting pipeconf on {}",
262 device.id(), e.getCause());
263 } catch (TimeoutException e) {
264 log.error("Operation TIMEOUT while setting pipeconf on {}",
265 device.id());
266 }
267 return false;
268 }
269
270 private Runnable withLock(Runnable task, Object object) {
271 return () -> {
272 final Lock lock = locks.get(object);
273 lock.lock();
274 try {
275 task.run();
276 } finally {
277 lock.unlock();
278 }
279 };
280 }
281
282 private void signalStatusUnknown(DeviceId deviceId) {
283 statusMap.remove(deviceId);
284 }
285
286 private void signalStatusReady(DeviceId deviceId) {
287 statusMap.put(deviceId, PipelineStatus.READY);
288 }
289
290 private boolean isLocalMaster(Device device) {
291 if (mastershipService.isLocalMaster(device.id())) {
292 return true;
293 }
294 // The device might have no master (e.g. after it has been disconnected
295 // from core), hence we use device mastership state.
296 final MastershipInfo info = mastershipService.getMastershipFor(device.id());
297 return !info.master().isPresent() &&
298 device.is(DeviceHandshaker.class) &&
299 device.as(DeviceHandshaker.class).getRole()
300 .equals(MastershipRole.MASTER);
301 }
302
303 private void startProbeTask() {
304 synchronized (timer) {
305 log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
306 task = new InternalTimerTask();
307 timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
308 probeInterval * SECONDS);
309 }
310 }
311
312
313 private void stopProbeTask() {
314 synchronized (timer) {
315 log.info("Stopping pipeline probe thread...");
316 task.cancel();
317 task = null;
318 }
319 }
320
321
322 private synchronized void rescheduleProbeTask() {
323 synchronized (timer) {
324 stopProbeTask();
325 startProbeTask();
326 }
327 }
328
329 private class InternalTimerTask extends TimerTask {
330 @Override
331 public void run() {
332 triggerCheckAllDevices();
333 }
334 }
335
336 /**
337 * Listener of device events used to update the pipeline status.
338 */
339 private class InternalDeviceListener implements DeviceListener {
340
341 @Override
342 public void event(DeviceEvent event) {
343 final Device device = event.subject();
344 switch (event.type()) {
345 case DEVICE_ADDED:
346 case DEVICE_UPDATED:
347 case DEVICE_AVAILABILITY_CHANGED:
348 if (!deviceService.isAvailable(device.id())) {
349 signalStatusUnknown(device.id());
350 }
351 break;
352 case DEVICE_REMOVED:
353 case DEVICE_SUSPENDED:
354 signalStatusUnknown(device.id());
355 break;
356 case PORT_ADDED:
357 case PORT_UPDATED:
358 case PORT_REMOVED:
359 case PORT_STATS_UPDATED:
360 default:
361 break;
362 }
363 }
364 }
365
366 private class StatusMapListener
367 implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
368
369 @Override
370 public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
371 final DeviceId deviceId = event.key();
372 final PipelineStatus status = event.value();
373 switch (event.type()) {
374 case PUT:
375 postStatusEvent(deviceId, status);
376 break;
377 case REMOVE:
378 postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
379 break;
380 default:
381 log.error("Unknown map event type {}", event.type());
382 }
383 }
384
385 private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
386 PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
387 oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
388 final PiPipeconfWatchdogEvent.Type eventType =
389 newStatus == PipelineStatus.READY
390 ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
391 : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
392 if (newStatus != oldStatus) {
393 log.info("Pipeline status of {} is {}", deviceId, newStatus);
394 post(new PiPipeconfWatchdogEvent(eventType, deviceId));
395 }
396 }
397 }
398}