blob: e5c321e06879ea65ff4e59e3ec5e40a7e8a251b7 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020022import com.google.common.util.concurrent.Striped;
Andrea Campanella241896c2017-05-10 13:11:04 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020026import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070028import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.onlab.packet.ChassisId;
31import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020032import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020033import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella241896c2017-05-10 13:11:04 -070034import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020035import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.net.AnnotationKeys;
37import org.onosproject.net.DefaultAnnotations;
38import org.onosproject.net.Device;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.MastershipRole;
41import org.onosproject.net.PortNumber;
Carmelo Cascone87892e22017-11-13 16:01:29 -080042import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070043import org.onosproject.net.behaviour.PortAdmin;
44import org.onosproject.net.config.ConfigFactory;
45import org.onosproject.net.config.NetworkConfigEvent;
46import org.onosproject.net.config.NetworkConfigListener;
47import org.onosproject.net.config.NetworkConfigRegistry;
48import org.onosproject.net.config.basics.BasicDeviceConfig;
49import org.onosproject.net.config.basics.SubjectFactories;
50import org.onosproject.net.device.DefaultDeviceDescription;
Carmelo Casconee5b28722018-06-22 17:28:28 +020051import org.onosproject.net.device.DeviceAgentEvent;
52import org.onosproject.net.device.DeviceAgentListener;
Andrea Campanella241896c2017-05-10 13:11:04 -070053import org.onosproject.net.device.DeviceDescription;
54import org.onosproject.net.device.DeviceDescriptionDiscovery;
55import org.onosproject.net.device.DeviceEvent;
56import org.onosproject.net.device.DeviceHandshaker;
57import org.onosproject.net.device.DeviceListener;
58import org.onosproject.net.device.DeviceProvider;
59import org.onosproject.net.device.DeviceProviderRegistry;
60import org.onosproject.net.device.DeviceProviderService;
61import org.onosproject.net.device.DeviceService;
62import org.onosproject.net.device.PortDescription;
63import org.onosproject.net.device.PortStatistics;
64import org.onosproject.net.device.PortStatisticsDiscovery;
65import org.onosproject.net.driver.Behaviour;
66import org.onosproject.net.driver.DefaultDriverData;
67import org.onosproject.net.driver.DefaultDriverHandler;
68import org.onosproject.net.driver.Driver;
69import org.onosproject.net.driver.DriverData;
70import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040071import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020072import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080073import org.onosproject.net.pi.service.PiPipeconfConfig;
74import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070075import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
76import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
77import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
Andrea Campanella241896c2017-05-10 13:11:04 -070078import org.onosproject.net.provider.AbstractProvider;
79import org.onosproject.net.provider.ProviderId;
80import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020081import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070082import org.slf4j.Logger;
83
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080084import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070085import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020086import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020087import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070088import java.util.List;
Thomas Vachuska5b38dc02018-05-10 15:24:40 -070089import java.util.Map;
Andrea Campanellabc112a92017-06-26 19:06:43 +020090import java.util.Set;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070091import java.util.StringJoiner;
Andrea Campanella241896c2017-05-10 13:11:04 -070092import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020093import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020094import java.util.concurrent.ConcurrentMap;
95import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070096import java.util.concurrent.ExecutionException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020097import java.util.concurrent.ExecutorService;
Andrea Campanella241896c2017-05-10 13:11:04 -070098import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020099import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -0700100import java.util.concurrent.TimeUnit;
101import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200102import java.util.concurrent.locks.Lock;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200103import java.util.function.Supplier;
Andrea Campanella241896c2017-05-10 13:11:04 -0700104
Carmelo Casconee5b28722018-06-22 17:28:28 +0200105import static java.util.concurrent.Executors.newFixedThreadPool;
Andrea Campanella241896c2017-05-10 13:11:04 -0700106import static java.util.concurrent.Executors.newScheduledThreadPool;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200107import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Andrea Campanella241896c2017-05-10 13:11:04 -0700108import static org.onlab.util.Tools.groupedThreads;
109import static org.onosproject.net.device.DeviceEvent.Type;
110import static org.slf4j.LoggerFactory.getLogger;
111
112/**
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200113 * Provider which uses drivers to detect device and do initial handshake and
114 * channel establishment with devices. Any other provider specific operation is
115 * also delegated to the DeviceHandshaker driver.
Andrea Campanella241896c2017-05-10 13:11:04 -0700116 */
117@Beta
118@Component(immediate = true)
119public class GeneralDeviceProvider extends AbstractProvider
120 implements DeviceProvider {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200121
Andrea Campanella241896c2017-05-10 13:11:04 -0700122 private final Logger log = getLogger(getClass());
123
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700124 private static final String APP_NAME = "org.onosproject.gdp";
125 private static final String URI_SCHEME = "device";
126 private static final String CFG_SCHEME = "generalprovider";
127 private static final String DEVICE_PROVIDER_PACKAGE =
128 "org.onosproject.general.provider.device";
129 private static final int CORE_POOL_SIZE = 10;
130 private static final String UNKNOWN = "unknown";
131 private static final String DRIVER = "driver";
132 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS =
133 ImmutableSet.of("p4runtime");
134
Andrea Campanella241896c2017-05-10 13:11:04 -0700135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200136 private DeviceProviderRegistry providerRegistry;
Andrea Campanella241896c2017-05-10 13:11:04 -0700137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200139 private ComponentConfigService componentConfigService;
Andrea Campanella4929a812017-10-09 18:38:23 +0200140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200142 private NetworkConfigRegistry cfgService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200145 private CoreService coreService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200148 private DeviceService deviceService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200151 private DriverService driverService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200154 private MastershipService mastershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700157 private PiPipeconfService pipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700158
Andrea Campanella14e196d2017-07-24 18:11:36 +0200159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700160 private PiPipeconfWatchdogService pipeconfWatchdogService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200161
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200162 private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
163 private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
164 @Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
Andrea Campanella19090322017-08-22 10:31:37 +0200165 label = "Configure poll frequency for port status and statistics; " +
166 "default is 10 sec")
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200167 private int statsPollFrequency = DEFAULT_STATS_POLL_FREQUENCY;
Andrea Campanella19090322017-08-22 10:31:37 +0200168
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200169 private static final String PROBE_FREQUENCY = "deviceProbeFrequency";
170 private static final int DEFAULT_PROBE_FREQUENCY = 10;
171 @Property(name = PROBE_FREQUENCY, intValue = DEFAULT_PROBE_FREQUENCY,
172 label = "Configure probe frequency for checking device availability; " +
Andrea Campanella1e573442018-05-17 17:07:13 +0200173 "default is 10 sec")
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200174 private int probeFrequency = DEFAULT_PROBE_FREQUENCY;
175
176 private static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
177 private static final int DEFAULT_OP_TIMEOUT_SHORT = 10;
178 @Property(name = OP_TIMEOUT_SHORT, intValue = DEFAULT_OP_TIMEOUT_SHORT,
179 label = "Configure timeout in seconds for device operations " +
180 "that are supposed to take a short time " +
181 "(e.g. checking device reachability); default is 10 seconds")
182 private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
183
Andrea Campanellabc112a92017-06-26 19:06:43 +0200184 //FIXME to be removed when netcfg will issue device events in a bundle or
185 //ensures all configuration needed is present
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700186 private final Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
187 private final Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
188 private final Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700189
Carmelo Casconee5b28722018-06-22 17:28:28 +0200190 private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700191 private final ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
192 private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
193 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
194 private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200195 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
196 private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700197 private final ConfigFactory factory = new InternalConfigFactory();
198 private final Striped<Lock> deviceLocks = Striped.lock(30);
Andrea Campanella241896c2017-05-10 13:11:04 -0700199
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700200 private ExecutorService connectionExecutor;
201 private ScheduledExecutorService statsExecutor;
202 private ScheduledExecutorService probeExecutor;
203 private ScheduledFuture<?> probeTask;
204 private DeviceProviderService providerService;
205
206 public GeneralDeviceProvider() {
207 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
208 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700209
210 @Activate
Andrea Campanella1e573442018-05-17 17:07:13 +0200211 public void activate(ComponentContext context) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700212 connectionExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
213 "onos/gdp-connect", "%d", log));
214 statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
215 "onos/gdp-stats", "%d", log));
216 probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
217 "onos/gdp-probe", "%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700218 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200219 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700220 coreService.registerApplication(APP_NAME);
221 cfgService.registerConfigFactory(factory);
222 cfgService.addListener(cfgListener);
223 deviceService.addListener(deviceListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700224 pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
225 rescheduleProbeTask(false);
Andrea Campanella1e573442018-05-17 17:07:13 +0200226 modified(context);
Andrea Campanella241896c2017-05-10 13:11:04 -0700227 log.info("Started");
228 }
229
Andrea Campanella19090322017-08-22 10:31:37 +0200230 @Modified
231 public void modified(ComponentContext context) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200232 if (context == null) {
233 return;
Andrea Campanella19090322017-08-22 10:31:37 +0200234 }
235
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200236 Dictionary<?, ?> properties = context.getProperties();
237 final int oldStatsPollFrequency = statsPollFrequency;
238 statsPollFrequency = Tools.getIntegerProperty(
239 properties, STATS_POLL_FREQUENCY, DEFAULT_STATS_POLL_FREQUENCY);
240 log.info("Configured. {} is configured to {} seconds",
241 STATS_POLL_FREQUENCY, statsPollFrequency);
242 final int oldProbeFrequency = probeFrequency;
243 probeFrequency = Tools.getIntegerProperty(
244 properties, PROBE_FREQUENCY, DEFAULT_PROBE_FREQUENCY);
245 log.info("Configured. {} is configured to {} seconds",
246 PROBE_FREQUENCY, probeFrequency);
247 opTimeoutShort = Tools.getIntegerProperty(
248 properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
249 log.info("Configured. {} is configured to {} seconds",
250 OP_TIMEOUT_SHORT, opTimeoutShort);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200251
252 if (oldStatsPollFrequency != statsPollFrequency) {
253 rescheduleStatsPollingTasks();
Andrea Campanella19090322017-08-22 10:31:37 +0200254 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200255
256 if (oldProbeFrequency != probeFrequency) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700257 rescheduleProbeTask(true);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200258 }
259 }
260
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700261 private void rescheduleProbeTask(boolean deelay) {
262 synchronized (this) {
263 if (probeTask != null) {
264 probeTask.cancel(false);
265 }
266 probeTask = probeExecutor.scheduleAtFixedRate(
267 this::triggerProbeAllDevices,
268 deelay ? probeFrequency : 0,
269 probeFrequency,
270 TimeUnit.SECONDS);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200271 }
Andrea Campanella19090322017-08-22 10:31:37 +0200272 }
273
Andrea Campanella241896c2017-05-10 13:11:04 -0700274 @Deactivate
275 public void deactivate() {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700276 // Shutdown stats polling tasks.
277 statsPollingTasks.keySet().forEach(this::cancelStatsPolling);
278 statsPollingTasks.clear();
279 statsExecutor.shutdownNow();
280 try {
281 statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
282 } catch (InterruptedException e) {
283 log.warn("statsExecutor not terminated properly");
284 }
285 statsExecutor = null;
286 // Shutdown probe executor.
287 probeTask.cancel(true);
288 probeTask = null;
289 probeExecutor.shutdownNow();
290 try {
291 probeExecutor.awaitTermination(5, TimeUnit.SECONDS);
292 } catch (InterruptedException e) {
293 log.warn("probeExecutor not terminated properly");
294 }
295 probeExecutor = null;
296 // Shutdown connection executor.
297 connectionExecutor.shutdownNow();
298 try {
299 connectionExecutor.awaitTermination(5, TimeUnit.SECONDS);
300 } catch (InterruptedException e) {
301 log.warn("connectionExecutor not terminated properly");
302 }
303 connectionExecutor = null;
304 // Remove all device agent listeners
305 handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
306 handshakersWithListeners.clear();
307 // Other cleanup.
Andrea Campanella4929a812017-10-09 18:38:23 +0200308 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700309 cfgService.removeListener(cfgListener);
Andrea Campanella241896c2017-05-10 13:11:04 -0700310 deviceService.removeListener(deviceListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700311 pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
Andrea Campanella241896c2017-05-10 13:11:04 -0700312 providerRegistry.unregister(this);
313 providerService = null;
314 cfgService.unregisterConfigFactory(factory);
315 log.info("Stopped");
316 }
317
Andrea Campanella241896c2017-05-10 13:11:04 -0700318
319 @Override
320 public void triggerProbe(DeviceId deviceId) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200321 connectionExecutor.execute(withDeviceLock(
322 () -> doDeviceProbe(deviceId), deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700323 }
324
325 @Override
326 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700327 log.info("Notifying role {} to device {}", newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200328 requestedRoles.put(deviceId, newRole);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200329 connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200330 }
331
332 private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700333 final DeviceHandshaker handshaker = getBehaviour(
334 deviceId, DeviceHandshaker.class);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200335 if (handshaker == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200336 log.error("Null handshaker. Unable to notify new role {} to {}",
337 newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200338 return;
339 }
340 handshaker.roleChanged(newRole);
Andrea Campanella241896c2017-05-10 13:11:04 -0700341 }
342
343 @Override
344 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200345 log.debug("Testing reachability for device {}", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700346 final DeviceHandshaker handshaker = getBehaviour(
347 deviceId, DeviceHandshaker.class);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100348 if (handshaker == null) {
349 return false;
350 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200351 return getFutureWithDeadline(
352 handshaker.isReachable(), "checking reachability",
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200353 deviceId, false, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700354 }
355
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700356 private boolean isConnected(DeviceId deviceId) {
357 log.debug("Testing connection to device {}", deviceId);
358 final DeviceHandshaker handshaker = getBehaviour(
359 deviceId, DeviceHandshaker.class);
360 if (handshaker == null) {
361 return false;
362 }
363 return handshaker.isConnected();
364 }
365
Andrea Campanella241896c2017-05-10 13:11:04 -0700366 @Override
367 public void changePortState(DeviceId deviceId, PortNumber portNumber,
368 boolean enable) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200369 connectionExecutor.execute(
370 () -> doChangePortState(deviceId, portNumber, enable));
371 }
372
373 private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
374 boolean enable) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200375 if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
376 log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
377 deviceId);
378 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700379 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200380 final PortAdmin portAdmin = deviceService.getDevice(deviceId)
381 .as(PortAdmin.class);
382 final CompletableFuture<Boolean> modifyTask = enable
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200383 ? portAdmin.enable(portNumber)
384 : portAdmin.disable(portNumber);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200385 final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
386 getFutureWithDeadline(
387 modifyTask, descr, deviceId, null, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700388 }
389
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700390 @Override
391 public void triggerDisconnect(DeviceId deviceId) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200392 log.debug("Triggering disconnection of device {}", deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200393 connectionExecutor.execute(withDeviceLock(
394 () -> doDisconnectDevice(deviceId), deviceId));
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700395 }
396
Andrea Campanella241896c2017-05-10 13:11:04 -0700397 private Driver getDriver(DeviceId deviceId) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700398 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200399 // DriverManager checks first using basic device config.
400 return driverService.getDriver(deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700401 } catch (ItemNotFoundException e) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200402 log.error("Driver not found for {}", deviceId);
403 return null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700404 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700405 }
406
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700407 private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
408 // Get handshaker.
409
410 Driver driver = getDriver(deviceId);
411 if (driver == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700412 return null;
413 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700414 if (!driver.hasBehaviour(type)) {
415 return null;
416 }
417 final DriverData data = new DefaultDriverData(driver, deviceId);
418 // Storing deviceKeyId and all other config values as data in the driver
419 // with protocol_<info> name as the key. e.g protocol_ip.
420 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
421 deviceId, GeneralProviderDeviceConfig.class);
422 if (providerConfig != null) {
423 providerConfig.protocolsInfo().forEach((protocol, info) -> {
424 info.configValues().forEach(
425 (k, v) -> data.set(protocol + "_" + k, v));
426 data.set(protocol + "_key", info.deviceKeyId());
427 });
428 }
429 final DefaultDriverHandler handler = new DefaultDriverHandler(data);
430 return driver.createBehaviour(handler, type);
Andrea Campanella241896c2017-05-10 13:11:04 -0700431 }
432
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200433 private void doConnectDevice(DeviceId deviceId) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700434 log.debug("Initiating connection to device {}...", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200435 // Retrieve config
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700436 if (configIsMissing(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200437 return;
438 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700439 // Bind pipeconf (if any and if device is capable).
440 if (!bindPipeconfIfRequired(deviceId)) {
441 // We already logged the error.
442 return;
443 }
444 // Get handshaker.
445 final DeviceHandshaker handshaker = getBehaviour(
446 deviceId, DeviceHandshaker.class);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200447 if (handshaker == null) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700448 log.error("Missing handshaker behavior for {}, aborting connection",
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200449 deviceId);
450 return;
451 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700452 // Add device agent listener.
453 handshaker.addDeviceAgentListener(id(), deviceAgentListener);
454 handshakersWithListeners.put(deviceId, handshaker);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200455 // Start connection via handshaker.
456 final Boolean connectSuccess = getFutureWithDeadline(
457 handshaker.connect(), "initiating connection",
Carmelo Casconede3b6842018-09-05 17:45:10 -0700458 deviceId, false, opTimeoutShort);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700459 if (!connectSuccess) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200460 log.warn("Unable to connect to {}", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700461 }
462 }
463
464 private void triggerAdvertiseDevice(DeviceId deviceId) {
465 connectionExecutor.execute(withDeviceLock(
466 () -> doAdvertiseDevice(deviceId), deviceId));
467 }
468
469 private void doAdvertiseDevice(DeviceId deviceId) {
470 // Retrieve config
471 if (configIsMissing(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200472 return;
473 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700474 // Obtain device and port description.
475 final boolean isPipelineReady = isPipelineReady(deviceId);
476 final DeviceDescription description = getDeviceDescription(
477 deviceId, isPipelineReady);
478 final List<PortDescription> ports = getPortDetails(deviceId);
479 // Advertise to core.
480 if (deviceService.getDevice(deviceId) == null ||
481 (description.isDefaultAvailable() &&
482 !deviceService.isAvailable(deviceId))) {
483 if (!isPipelineReady) {
484 log.info("Advertising device to core with available={} as " +
485 "device pipeline is not ready yet",
486 description.isDefaultAvailable());
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400487 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700488 providerService.deviceConnected(deviceId, description);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200489 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700490 providerService.updatePorts(deviceId, ports);
491 // If pipeline is not ready, encourage watchdog to perform probe ASAP.
492 if (!isPipelineReady) {
493 pipeconfWatchdogService.triggerProbe(deviceId);
494 }
495 }
496
497 private DeviceDescription getDeviceDescription(
498 DeviceId deviceId, boolean defaultAvailable) {
499 // Get one from driver or forge.
500 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
501 deviceId, DeviceDescriptionDiscovery.class);
502 if (deviceDiscovery != null) {
503 // Enforce defaultAvailable flag over the one obtained from driver.
504 final DeviceDescription d = deviceDiscovery.discoverDeviceDetails();
505 return new DefaultDeviceDescription(d, defaultAvailable, d.annotations());
506 } else {
507 return forgeDeviceDescription(deviceId, defaultAvailable);
508 }
509 }
510
511 private List<PortDescription> getPortDetails(DeviceId deviceId) {
512 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
513 deviceId, DeviceDescriptionDiscovery.class);
514 if (deviceDiscovery != null) {
515 return deviceDiscovery.discoverPortDetails();
516 } else {
517 return Collections.emptyList();
518 }
519 }
520
521 private DeviceDescription forgeDeviceDescription(
522 DeviceId deviceId, boolean defaultAvailable) {
523 // Uses handshaker and provider config to get driver data.
524 final DeviceHandshaker handshaker = getBehaviour(
525 deviceId, DeviceHandshaker.class);
526 final Driver driver = handshaker != null
527 ? handshaker.handler().driver() : null;
528 final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
529 deviceId, GeneralProviderDeviceConfig.class);
530 final DefaultAnnotations.Builder annBuilder = DefaultAnnotations.builder();
531 // If device is pipeline programmable, let this provider decide when the
532 // device can be marked online.
533 annBuilder.set(AnnotationKeys.PROVIDER_MARK_ONLINE,
534 String.valueOf(isPipelineProgrammable(deviceId)));
535 if (cfg != null) {
536 StringJoiner protoStringBuilder = new StringJoiner(", ");
537 cfg.protocolsInfo().keySet().forEach(protoStringBuilder::add);
538 annBuilder.set(AnnotationKeys.PROTOCOL, protoStringBuilder.toString());
539 }
540 return new DefaultDeviceDescription(
541 deviceId.uri(),
542 Device.Type.SWITCH,
543 driver != null ? driver.manufacturer() : UNKNOWN,
544 driver != null ? driver.hwVersion() : UNKNOWN,
545 driver != null ? driver.swVersion() : UNKNOWN,
546 UNKNOWN,
547 new ChassisId(),
548 defaultAvailable,
549 annBuilder.build());
550 }
551
552 private void triggerMarkAvailable(DeviceId deviceId) {
553 connectionExecutor.execute(withDeviceLock(
554 () -> doMarkAvailable(deviceId), deviceId));
555 }
556
557 private void doMarkAvailable(DeviceId deviceId) {
558 if (deviceService.isAvailable(deviceId)) {
559 return;
560 }
561 final DeviceDescription descr = getDeviceDescription(deviceId, true);
562 // It has been observed that devices that were marked offline (e.g.
563 // after device disconnection) might end up with no master. Here we
564 // trigger a new master election (if device has no master).
565 mastershipService.requestRoleForSync(deviceId);
566 providerService.deviceConnected(deviceId, descr);
567 }
568
569 private boolean bindPipeconfIfRequired(DeviceId deviceId) {
570 if (pipeconfService.ofDevice(deviceId).isPresent()
571 || !isPipelineProgrammable(deviceId)) {
572 // Nothing to do, all good.
573 return true;
574 }
575 // Get pipeconf from netcfg or driver (default one).
576 final PiPipelineProgrammable pipelineProg = getBehaviour(
577 deviceId, PiPipelineProgrammable.class);
578 final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
579 if (pipeconfId == null) {
580 return false;
581 }
582 // Store binding in pipeconf service.
583 pipeconfService.bindToDevice(pipeconfId, deviceId);
584 return true;
585 }
586
587 private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
588 // Places to look for a pipeconf ID (in priority order)):
589 // 1) netcfg
590 // 2) device driver (default one)
591 final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
592 if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
593 return pipeconfId;
594 }
595 if (pipelineProg != null
596 && pipelineProg.getDefaultPipeconf().isPresent()) {
597 final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
598 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
599 return defaultPipeconf.id();
600 } else {
601 log.warn("Unable to associate a pipeconf to {}", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200602 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400603 }
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400604 }
605
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200606 private void doDisconnectDevice(DeviceId deviceId) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200607 log.debug("Initiating disconnection from {}...", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700608 final DeviceHandshaker handshaker = getBehaviour(
609 deviceId, DeviceHandshaker.class);
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200610 final boolean isAvailable = deviceService.isAvailable(deviceId);
611 // Signal disconnection to core (if master).
612 if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200613 providerService.deviceDisconnected(deviceId);
614 }
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200615 // Cancel tasks.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200616 cancelStatsPolling(deviceId);
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200617 // Disconnect device.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200618 if (handshaker == null) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200619 if (isAvailable) {
620 // If not available don't bother logging. We are probably
621 // invoking this method multiple times for the same device.
622 log.warn("Missing DeviceHandshaker behavior for {}, " +
623 "no guarantees of complete disconnection",
624 deviceId);
625 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200626 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700627 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700628 handshaker.removeDeviceAgentListener(id());
629 handshakersWithListeners.remove(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200630 final boolean disconnectSuccess = getFutureWithDeadline(
631 handshaker.disconnect(), "performing disconnection",
632 deviceId, false, opTimeoutShort);
633 if (!disconnectSuccess) {
634 log.warn("Unable to disconnect from {}", deviceId);
635 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700636 }
637
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200638 // Needed to catch the exception in the executors since are not rethrown otherwise.
Andrea Campanella241896c2017-05-10 13:11:04 -0700639 private Runnable exceptionSafe(Runnable runnable) {
640 return () -> {
641 try {
642 runnable.run();
643 } catch (Exception e) {
644 log.error("Unhandled Exception", e);
645 }
646 };
647 }
648
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200649 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
650 final Lock lock = deviceLocks.get(deviceId);
651 lock.lock();
652 try {
653 return task.get();
654 } finally {
655 lock.unlock();
656 }
657 }
658
659 private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
660 // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
661 return () -> withDeviceLock(() -> {
662 task.run();
663 return null;
664 }, deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200665 }
666
Andrea Campanella241896c2017-05-10 13:11:04 -0700667 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900668 Device device = deviceService.getDevice(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200669 if (device != null && deviceService.isAvailable(deviceId) &&
Andrea Campanellace111932017-09-18 16:59:56 +0900670 device.is(PortStatisticsDiscovery.class)) {
671 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
672 .discoverPortStatistics();
673 //updating statistcs only if not empty
674 if (!statistics.isEmpty()) {
675 providerService.updatePortStatistics(deviceId, statistics);
676 }
677 } else {
678 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200679 }
680 }
681
Carmelo Casconee5b28722018-06-22 17:28:28 +0200682 private boolean notMyScheme(DeviceId deviceId) {
683 return !deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700684 }
685
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200686 private void triggerConnect(DeviceId deviceId) {
687 connectionExecutor.execute(withDeviceLock(
688 () -> doConnectDevice(deviceId), deviceId));
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200689 }
690
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700691 private boolean isPipelineProgrammable(DeviceId deviceId) {
692 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
693 deviceId, GeneralProviderDeviceConfig.class);
694 if (providerConfig == null) {
695 return false;
696 }
697 return !Collections.disjoint(
698 ImmutableSet.copyOf(providerConfig.node().fieldNames()),
699 PIPELINE_CONFIGURABLE_PROTOCOLS);
700 }
701
Andrea Campanella241896c2017-05-10 13:11:04 -0700702 /**
703 * Listener for configuration events.
704 */
705 private class InternalNetworkConfigListener implements NetworkConfigListener {
706
Andrea Campanella241896c2017-05-10 13:11:04 -0700707 @Override
708 public void event(NetworkConfigEvent event) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200709 connectionExecutor.execute(() -> consumeConfigEvent(event));
710 }
711
712 @Override
713 public boolean isRelevant(NetworkConfigEvent event) {
714 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
715 event.configClass().equals(BasicDeviceConfig.class) ||
716 event.configClass().equals(PiPipeconfConfig.class)) &&
717 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
718 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
719 }
720
721 private void consumeConfigEvent(NetworkConfigEvent event) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700722 DeviceId deviceId = (DeviceId) event.subject();
723 //Assuming that the deviceId comes with uri 'device:'
Carmelo Casconee5b28722018-06-22 17:28:28 +0200724 if (notMyScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700725 // not under my scheme, skipping
726 log.debug("{} is not my scheme, skipping", deviceId);
727 return;
728 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200729 final boolean configComplete = withDeviceLock(
730 () -> isDeviceConfigComplete(event, deviceId), deviceId);
731 if (!configComplete) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200732 // Still waiting for some configuration.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200733 return;
734 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200735 // Good to go.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200736 triggerConnect(deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200737 cleanUpConfigInfo(deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200738 }
739
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200740 private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
741 // FIXME to be removed when netcfg will issue device events in a bundle or
Andrea Campanellabc112a92017-06-26 19:06:43 +0200742 // ensure all configuration needed is present
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200743 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
744 //FIXME we currently assume that p4runtime devices are pipeline configurable.
745 //If we want to connect a p4runtime device with no pipeline
746 if (event.config().isPresent()) {
747 deviceConfigured.add(deviceId);
748 final boolean isNotPipelineConfigurable = Collections.disjoint(
749 ImmutableSet.copyOf(event.config().get().node().fieldNames()),
750 PIPELINE_CONFIGURABLE_PROTOCOLS);
751 if (isNotPipelineConfigurable) {
752 // Skip waiting for a pipeline if we can't support it.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200753 pipelineConfigured.add(deviceId);
754 }
755 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200756 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
757 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
758 driverConfigured.add(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200759 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200760 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
761 if (event.config().isPresent()
762 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
763 pipelineConfigured.add(deviceId);
764 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700765 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200766
767 if (deviceConfigured.contains(deviceId)
768 && driverConfigured.contains(deviceId)
769 && pipelineConfigured.contains(deviceId)) {
770 return true;
771 } else {
772 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
773 log.debug("Waiting for pipeline configuration for device {}", deviceId);
774 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
775 log.debug("Waiting for device configuration for device {}", deviceId);
776 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
777 log.debug("Waiting for driver configuration for device {}", deviceId);
778 } else if (driverConfigured.contains(deviceId)) {
779 log.debug("Only driver configuration for device {}", deviceId);
780 } else if (deviceConfigured.contains(deviceId)) {
781 log.debug("Only device configuration for device {}", deviceId);
782 }
783 }
784 return false;
Andrea Campanella241896c2017-05-10 13:11:04 -0700785 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700786 }
787
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700788 private boolean isPipelineReady(DeviceId deviceId) {
789 final boolean isPipelineProg = isPipelineProgrammable(deviceId);
790 final boolean isPipeconfReady = pipeconfWatchdogService
791 .getStatus(deviceId)
792 .equals(PiPipeconfWatchdogService.PipelineStatus.READY);
793 return !isPipelineProg || isPipeconfReady;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200794 }
795
796 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200797 deviceConfigured.remove(deviceId);
798 driverConfigured.remove(deviceId);
799 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200800 }
801
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200802 private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
803 statsPollingTasks.compute(deviceId, (did, oldTask) -> {
804 if (oldTask != null) {
805 oldTask.cancel(false);
806 }
807 final int delay = withRandomDelay
808 ? new SecureRandom().nextInt(10) : 0;
809 return statsExecutor.scheduleAtFixedRate(
810 exceptionSafe(() -> updatePortStatistics(deviceId)),
811 delay, statsPollFrequency, TimeUnit.SECONDS);
812 });
Andrea Campanella19090322017-08-22 10:31:37 +0200813 }
814
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200815 private void cancelStatsPolling(DeviceId deviceId) {
816 statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
817 task.cancel(false);
818 return null;
819 });
820 }
821
822 private void rescheduleStatsPollingTasks() {
823 statsPollingTasks.keySet().forEach(deviceId -> {
824 // startStatsPolling cancels old one if present.
825 startStatsPolling(deviceId, true);
826 });
827 }
828
829 private void triggerProbeAllDevices() {
830 // Async trigger a task for all devices in the cfg.
831 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700832 .forEach(this::triggerDeviceProbe);
Andrea Campanella1e573442018-05-17 17:07:13 +0200833 }
834
Carmelo Cascone92044522018-06-29 19:00:59 +0200835 private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
836 PiPipeconfConfig config = cfgService.getConfig(
837 deviceId, PiPipeconfConfig.class);
838 if (config == null) {
839 return null;
840 }
841 return config.piPipeconfId();
842 }
843
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700844 private void triggerDeviceProbe(DeviceId deviceId) {
845 connectionExecutor.execute(withDeviceLock(
846 () -> doDeviceProbe(deviceId), deviceId));
847 }
848
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200849 private void doDeviceProbe(DeviceId deviceId) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700850 log.debug("Probing device {}...", deviceId);
851 if (configIsMissing(deviceId)) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200852 return;
853 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700854 if (!isConnected(deviceId)) {
855 if (deviceService.isAvailable(deviceId)) {
856 providerService.deviceDisconnected(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200857 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200858 triggerConnect(deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200859 }
860 }
861
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700862 private boolean configIsMissing(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200863 final boolean present =
864 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
865 && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
Andrea Campanella1e573442018-05-17 17:07:13 +0200866 if (!present) {
867 log.warn("Configuration for device {} is not complete", deviceId);
868 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700869 return !present;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200870 }
871
872 private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700873 // Notify core about mastership response.
874 final MastershipRole request = requestedRoles.get(deviceId);
875 final boolean isAvailable = deviceService.isAvailable(deviceId);
876 if (request == null || !isAvailable) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200877 return;
878 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700879 log.debug("Device {} asserted role {} (requested was {})",
880 deviceId, response, request);
881 providerService.receivedRoleReply(deviceId, request, response);
882 // FIXME: this should be based on assigned mastership, not what returned by device
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200883 if (response.equals(MastershipRole.MASTER)) {
884 startStatsPolling(deviceId, false);
885 } else {
886 cancelStatsPolling(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200887 }
888 }
889
Carmelo Casconede3b6842018-09-05 17:45:10 -0700890 private void handleNotMaster(DeviceId deviceId) {
891 log.warn("Device {} notified that this node is not master, " +
892 "relinquishing mastership...", deviceId);
893 mastershipService.relinquishMastership(deviceId);
894 }
895
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200896 private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200897 DeviceId deviceId, U defaultValue, int timeout) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200898 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200899 return future.get(timeout, TimeUnit.SECONDS);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200900 } catch (InterruptedException e) {
901 log.error("Thread interrupted while {} on {}", opDescription, deviceId);
902 Thread.currentThread().interrupt();
903 } catch (ExecutionException e) {
904 log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
905 } catch (TimeoutException e) {
906 log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
907 }
908 return defaultValue;
909 }
910
Andrea Campanella241896c2017-05-10 13:11:04 -0700911 /**
912 * Listener for core device events.
913 */
914 private class InternalDeviceListener implements DeviceListener {
915 @Override
916 public void event(DeviceEvent event) {
Andrea Campanellace111932017-09-18 16:59:56 +0900917 DeviceId deviceId = event.subject().id();
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700918 // For now this is scheduled periodically, when streaming API will
919 // be available we check and base it on the streaming API (e.g. gNMI)
920 if (mastershipService.isLocalMaster(deviceId)) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200921 startStatsPolling(deviceId, true);
Andrea Campanella241896c2017-05-10 13:11:04 -0700922 }
923 }
924
925 @Override
926 public boolean isRelevant(DeviceEvent event) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700927 return event.type() == Type.DEVICE_ADDED &&
928 event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700929 }
930 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200931
932 /**
Carmelo Casconee5b28722018-06-22 17:28:28 +0200933 * Listener for device agent events.
Andrea Campanella1e573442018-05-17 17:07:13 +0200934 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200935 private class InternalDeviceAgentListener implements DeviceAgentListener {
Andrea Campanella1e573442018-05-17 17:07:13 +0200936
937 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200938 public void event(DeviceAgentEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200939 DeviceId deviceId = event.subject();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200940 switch (event.type()) {
941 case CHANNEL_OPEN:
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700942 triggerAdvertiseDevice(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200943 break;
944 case CHANNEL_CLOSED:
Carmelo Casconee5b28722018-06-22 17:28:28 +0200945 case CHANNEL_ERROR:
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700946 triggerDeviceProbe(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200947 break;
948 case ROLE_MASTER:
949 handleMastershipResponse(deviceId, MastershipRole.MASTER);
950 break;
951 case ROLE_STANDBY:
952 handleMastershipResponse(deviceId, MastershipRole.STANDBY);
953 break;
954 case ROLE_NONE:
955 handleMastershipResponse(deviceId, MastershipRole.NONE);
956 break;
Carmelo Casconede3b6842018-09-05 17:45:10 -0700957 case NOT_MASTER:
958 handleNotMaster(deviceId);
959 break;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200960 default:
961 log.warn("Unrecognized device agent event {}", event.type());
Andrea Campanella1e573442018-05-17 17:07:13 +0200962 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200963 }
964
965 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700966
967 private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
968 @Override
969 public void event(PiPipeconfWatchdogEvent event) {
970 triggerMarkAvailable(event.subject());
971 }
972
973 @Override
974 public boolean isRelevant(PiPipeconfWatchdogEvent event) {
975 return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_READY);
976 }
977 }
978
979 private class InternalConfigFactory
980 extends ConfigFactory<DeviceId, GeneralProviderDeviceConfig> {
981
982 InternalConfigFactory() {
983 super(SubjectFactories.DEVICE_SUBJECT_FACTORY,
984 GeneralProviderDeviceConfig.class, CFG_SCHEME);
985 }
986
987 @Override
988 public GeneralProviderDeviceConfig createConfig() {
989 return new GeneralProviderDeviceConfig();
990 }
991 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700992}