blob: 8376295cf319a1c44b6cdd7da7f3c322902fdbe4 [file] [log] [blame]
Carmelo Cascone9e4972c2018-08-30 00:29:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.pi.impl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.util.KryoNamespace;
30import org.onlab.util.Tools;
31import org.onosproject.cfg.ComponentConfigService;
32import org.onosproject.event.AbstractListenerManager;
33import org.onosproject.mastership.MastershipInfo;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.MastershipRole;
38import org.onosproject.net.behaviour.PiPipelineProgrammable;
39import org.onosproject.net.device.DeviceEvent;
40import org.onosproject.net.device.DeviceHandshaker;
41import org.onosproject.net.device.DeviceListener;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.pi.model.PiPipeconf;
44import org.onosproject.net.pi.model.PiPipeconfId;
45import org.onosproject.net.pi.service.PiPipeconfMappingStore;
46import org.onosproject.net.pi.service.PiPipeconfService;
47import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
48import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
49import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
50import org.onosproject.store.serializers.KryoNamespaces;
51import org.onosproject.store.service.EventuallyConsistentMap;
52import org.onosproject.store.service.EventuallyConsistentMapEvent;
53import org.onosproject.store.service.EventuallyConsistentMapListener;
54import org.onosproject.store.service.StorageService;
55import org.onosproject.store.service.WallClockTimestamp;
56import org.osgi.service.component.ComponentContext;
57import org.slf4j.Logger;
58
59import java.util.Collections;
60import java.util.Dictionary;
61import java.util.Map;
62import java.util.Timer;
63import java.util.TimerTask;
64import java.util.concurrent.ExecutionException;
65import java.util.concurrent.ExecutorService;
66import java.util.concurrent.Executors;
67import java.util.concurrent.TimeUnit;
68import java.util.concurrent.TimeoutException;
69import java.util.concurrent.locks.Lock;
70
71import static org.onlab.util.Tools.groupedThreads;
72import static org.slf4j.LoggerFactory.getLogger;
73
74/**
75 * Implementation of PiPipeconfWatchdogService that implements a periodic
76 * pipeline probe task and listens for device events to update the status of the
77 * pipeline.
78 */
79@Component(immediate = true)
80@Service
81public class PiPipeconfWatchdogManager
82 extends AbstractListenerManager<PiPipeconfWatchdogEvent, PiPipeconfWatchdogListener>
83 implements PiPipeconfWatchdogService {
84
85 private final Logger log = getLogger(getClass());
86
87 private static final long SECONDS = 1000L;
88 // Long enough to allow for network delay (e.g. to transfer large pipeline
89 // binaries over slow network).
90 private static final long PIPECONF_SET_TIMEOUT = 60; // Seconds.
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 private PiPipeconfMappingStore pipeconfMappingStore;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 private DeviceService deviceService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 private MastershipService mastershipService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected PiPipeconfService pipeconfService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected StorageService storageService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 private ComponentConfigService componentConfigService;
109
110 private static final String PROBE_INTERVAL = "probeInterval";
111 private static final int DEFAULT_PROBE_INTERVAL = 30;
112 @Property(name = PROBE_INTERVAL, intValue = DEFAULT_PROBE_INTERVAL,
113 label = "Configure interval in seconds for device pipeconf probing")
114 private int probeInterval = DEFAULT_PROBE_INTERVAL;
115
116 protected ExecutorService executor = Executors.newFixedThreadPool(
117 30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
118
119 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
120
121 private Timer timer;
122 private TimerTask task;
123
124 private final Striped<Lock> locks = Striped.lock(30);
125
126 private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
127 private Map<DeviceId, PipelineStatus> localStatusMap;
128
129 @Activate
130 public void activate() {
131 eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
132 localStatusMap = Maps.newConcurrentMap();
133 // Init distributed status map.
134 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
135 .register(KryoNamespaces.API)
136 .register(PipelineStatus.class);
137 statusMap = storageService.<DeviceId, PipelineStatus>eventuallyConsistentMapBuilder()
138 .withName("onos-pipeconf-status-table")
139 .withSerializer(serializer)
140 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
141 statusMap.addListener(new StatusMapListener());
142 // Register component configurable properties.
143 componentConfigService.registerProperties(getClass());
144 // Start periodic watchdog task.
145 timer = new Timer();
146 startProbeTask();
147 // Add device listener.
148 deviceService.addListener(deviceListener);
149 log.info("Started");
150 }
151
152 @Modified
153 public void modified(ComponentContext context) {
154 if (context == null) {
155 return;
156 }
157
158 Dictionary<?, ?> properties = context.getProperties();
159 final int oldProbeInterval = probeInterval;
160 probeInterval = Tools.getIntegerProperty(
161 properties, PROBE_INTERVAL, DEFAULT_PROBE_INTERVAL);
162 log.info("Configured. {} is configured to {} seconds",
163 PROBE_INTERVAL, probeInterval);
164
165 if (oldProbeInterval != probeInterval) {
166 rescheduleProbeTask();
167 }
168 }
169
170 @Deactivate
171 public void deactivate() {
172 eventDispatcher.removeSink(PiPipeconfWatchdogEvent.class);
173 deviceService.removeListener(deviceListener);
174 stopProbeTask();
175 timer = null;
176 statusMap = null;
177 localStatusMap = null;
178 log.info("Stopped");
179 }
180
181 @Override
182 public void triggerProbe(DeviceId deviceId) {
183 final Device device = deviceService.getDevice(deviceId);
184 if (device != null) {
185 filterAndTriggerTasks(Collections.singleton(device));
186 }
187 }
188
189 @Override
190 public PipelineStatus getStatus(DeviceId deviceId) {
191 final PipelineStatus status = statusMap.get(deviceId);
192 return status == null ? PipelineStatus.UNKNOWN : status;
193 }
194
195 private void triggerCheckAllDevices() {
196 filterAndTriggerTasks(deviceService.getDevices());
197 }
198
199 private void filterAndTriggerTasks(Iterable<Device> devices) {
200 devices.forEach(device -> {
201 if (!isLocalMaster(device)) {
202 return;
203 }
204
205 final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
206 if (pipeconfId == null
207 || !device.is(PiPipelineProgrammable.class)) {
208 return;
209 }
210
211 if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
212 log.error("Pipeconf {} is not registered", pipeconfId);
213 return;
214 }
215
216 final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
217
218 if (!device.is(DeviceHandshaker.class)) {
219 log.error("Missing DeviceHandshaker behavior for {}", device.id());
220 return;
221 }
222
223 // Trigger task with per-device lock.
224 executor.execute(withLock(() -> {
225 final boolean success = doSetPipeconfIfRequired(device, pipeconf);
226 if (success) {
227 signalStatusReady(device.id());
228 } else {
229 signalStatusUnknown(device.id());
230 }
231 }, device.id()));
232 });
233 }
234
235 /**
236 * Returns true if the given device is known to be configured with the given
237 * pipeline, false otherwise. If necessary, this method enforces setting the
238 * given pipeconf using drivers.
239 *
240 * @param device device
241 * @param pipeconf pipeconf
242 * @return boolean
243 */
244 private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
245 log.debug("Starting watchdog task for {} ({})", device.id(), pipeconf.id());
246 final PiPipelineProgrammable pipelineProg = device.as(PiPipelineProgrammable.class);
247 final DeviceHandshaker handshaker = device.as(DeviceHandshaker.class);
248 if (!handshaker.isConnected()) {
249 return false;
250 }
251 if (pipelineProg.isPipeconfSet(pipeconf)) {
252 log.debug("Pipeconf {} already configured on {}",
253 pipeconf.id(), device.id());
254 return true;
255 }
256 try {
257 return pipelineProg.setPipeconf(pipeconf)
258 .get(PIPECONF_SET_TIMEOUT, TimeUnit.SECONDS);
259 } catch (InterruptedException e) {
260 log.error("Thread interrupted while setting pipeconf on {}",
261 device.id());
262 Thread.currentThread().interrupt();
263 } catch (ExecutionException e) {
264 log.error("Exception while setting pipeconf on {}",
265 device.id(), e.getCause());
266 } catch (TimeoutException e) {
267 log.error("Operation TIMEOUT while setting pipeconf on {}",
268 device.id());
269 }
270 return false;
271 }
272
273 private Runnable withLock(Runnable task, Object object) {
274 return () -> {
275 final Lock lock = locks.get(object);
276 lock.lock();
277 try {
278 task.run();
279 } finally {
280 lock.unlock();
281 }
282 };
283 }
284
285 private void signalStatusUnknown(DeviceId deviceId) {
286 statusMap.remove(deviceId);
287 }
288
289 private void signalStatusReady(DeviceId deviceId) {
290 statusMap.put(deviceId, PipelineStatus.READY);
291 }
292
293 private boolean isLocalMaster(Device device) {
294 if (mastershipService.isLocalMaster(device.id())) {
295 return true;
296 }
297 // The device might have no master (e.g. after it has been disconnected
298 // from core), hence we use device mastership state.
299 final MastershipInfo info = mastershipService.getMastershipFor(device.id());
300 return !info.master().isPresent() &&
301 device.is(DeviceHandshaker.class) &&
302 device.as(DeviceHandshaker.class).getRole()
303 .equals(MastershipRole.MASTER);
304 }
305
306 private void startProbeTask() {
307 synchronized (timer) {
308 log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
309 task = new InternalTimerTask();
310 timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
311 probeInterval * SECONDS);
312 }
313 }
314
315
316 private void stopProbeTask() {
317 synchronized (timer) {
318 log.info("Stopping pipeline probe thread...");
319 task.cancel();
320 task = null;
321 }
322 }
323
324
325 private synchronized void rescheduleProbeTask() {
326 synchronized (timer) {
327 stopProbeTask();
328 startProbeTask();
329 }
330 }
331
332 private class InternalTimerTask extends TimerTask {
333 @Override
334 public void run() {
335 triggerCheckAllDevices();
336 }
337 }
338
339 /**
340 * Listener of device events used to update the pipeline status.
341 */
342 private class InternalDeviceListener implements DeviceListener {
343
344 @Override
345 public void event(DeviceEvent event) {
346 final Device device = event.subject();
347 switch (event.type()) {
348 case DEVICE_ADDED:
349 case DEVICE_UPDATED:
350 case DEVICE_AVAILABILITY_CHANGED:
351 if (!deviceService.isAvailable(device.id())) {
352 signalStatusUnknown(device.id());
353 }
354 break;
355 case DEVICE_REMOVED:
356 case DEVICE_SUSPENDED:
357 signalStatusUnknown(device.id());
358 break;
359 case PORT_ADDED:
360 case PORT_UPDATED:
361 case PORT_REMOVED:
362 case PORT_STATS_UPDATED:
363 default:
364 break;
365 }
366 }
367 }
368
369 private class StatusMapListener
370 implements EventuallyConsistentMapListener<DeviceId, PipelineStatus> {
371
372 @Override
373 public void event(EventuallyConsistentMapEvent<DeviceId, PipelineStatus> event) {
374 final DeviceId deviceId = event.key();
375 final PipelineStatus status = event.value();
376 switch (event.type()) {
377 case PUT:
378 postStatusEvent(deviceId, status);
379 break;
380 case REMOVE:
381 postStatusEvent(deviceId, PipelineStatus.UNKNOWN);
382 break;
383 default:
384 log.error("Unknown map event type {}", event.type());
385 }
386 }
387
388 private void postStatusEvent(DeviceId deviceId, PipelineStatus newStatus) {
389 PipelineStatus oldStatus = localStatusMap.put(deviceId, newStatus);
390 oldStatus = oldStatus == null ? PipelineStatus.UNKNOWN : oldStatus;
391 final PiPipeconfWatchdogEvent.Type eventType =
392 newStatus == PipelineStatus.READY
393 ? PiPipeconfWatchdogEvent.Type.PIPELINE_READY
394 : PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN;
395 if (newStatus != oldStatus) {
396 log.info("Pipeline status of {} is {}", deviceId, newStatus);
397 post(new PiPipeconfWatchdogEvent(eventType, deviceId));
398 }
399 }
400 }
401}