blob: 9918327671bd52a49101746900ed858231b25470 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020022import com.google.common.util.concurrent.Striped;
Andrea Campanella241896c2017-05-10 13:11:04 -070023import org.onlab.packet.ChassisId;
24import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020026import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020028import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070029import org.onosproject.net.AnnotationKeys;
30import org.onosproject.net.DefaultAnnotations;
31import org.onosproject.net.Device;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.MastershipRole;
34import org.onosproject.net.PortNumber;
Carmelo Cascone87892e22017-11-13 16:01:29 -080035import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.net.behaviour.PortAdmin;
37import org.onosproject.net.config.ConfigFactory;
38import org.onosproject.net.config.NetworkConfigEvent;
39import org.onosproject.net.config.NetworkConfigListener;
40import org.onosproject.net.config.NetworkConfigRegistry;
41import org.onosproject.net.config.basics.BasicDeviceConfig;
42import org.onosproject.net.config.basics.SubjectFactories;
43import org.onosproject.net.device.DefaultDeviceDescription;
Carmelo Casconee5b28722018-06-22 17:28:28 +020044import org.onosproject.net.device.DeviceAgentEvent;
45import org.onosproject.net.device.DeviceAgentListener;
Andrea Campanella241896c2017-05-10 13:11:04 -070046import org.onosproject.net.device.DeviceDescription;
47import org.onosproject.net.device.DeviceDescriptionDiscovery;
48import org.onosproject.net.device.DeviceEvent;
49import org.onosproject.net.device.DeviceHandshaker;
50import org.onosproject.net.device.DeviceListener;
51import org.onosproject.net.device.DeviceProvider;
52import org.onosproject.net.device.DeviceProviderRegistry;
53import org.onosproject.net.device.DeviceProviderService;
54import org.onosproject.net.device.DeviceService;
55import org.onosproject.net.device.PortDescription;
56import org.onosproject.net.device.PortStatistics;
57import org.onosproject.net.device.PortStatisticsDiscovery;
58import org.onosproject.net.driver.Behaviour;
59import org.onosproject.net.driver.DefaultDriverData;
60import org.onosproject.net.driver.DefaultDriverHandler;
61import org.onosproject.net.driver.Driver;
62import org.onosproject.net.driver.DriverData;
63import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040064import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020065import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080066import org.onosproject.net.pi.service.PiPipeconfConfig;
67import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070068import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
69import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
70import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
Andrea Campanella241896c2017-05-10 13:11:04 -070071import org.onosproject.net.provider.AbstractProvider;
72import org.onosproject.net.provider.ProviderId;
73import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020074import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075import org.osgi.service.component.annotations.Activate;
76import org.osgi.service.component.annotations.Component;
77import org.osgi.service.component.annotations.Deactivate;
78import org.osgi.service.component.annotations.Modified;
79import org.osgi.service.component.annotations.Reference;
80import org.osgi.service.component.annotations.ReferenceCardinality;
Andrea Campanella241896c2017-05-10 13:11:04 -070081import org.slf4j.Logger;
82
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080083import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070084import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020085import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020086import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070087import java.util.List;
Thomas Vachuska5b38dc02018-05-10 15:24:40 -070088import java.util.Map;
Andrea Campanellabc112a92017-06-26 19:06:43 +020089import java.util.Set;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070090import java.util.StringJoiner;
Andrea Campanella241896c2017-05-10 13:11:04 -070091import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020092import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020093import java.util.concurrent.ConcurrentMap;
94import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070095import java.util.concurrent.ExecutionException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020096import java.util.concurrent.ExecutorService;
Andrea Campanella241896c2017-05-10 13:11:04 -070097import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020098import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070099import java.util.concurrent.TimeUnit;
100import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200101import java.util.concurrent.locks.Lock;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200102import java.util.function.Supplier;
Andrea Campanella241896c2017-05-10 13:11:04 -0700103
Carmelo Casconee5b28722018-06-22 17:28:28 +0200104import static java.util.concurrent.Executors.newFixedThreadPool;
Andrea Campanella241896c2017-05-10 13:11:04 -0700105import static java.util.concurrent.Executors.newScheduledThreadPool;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200106import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Andrea Campanella241896c2017-05-10 13:11:04 -0700107import static org.onlab.util.Tools.groupedThreads;
108import static org.onosproject.net.device.DeviceEvent.Type;
109import static org.slf4j.LoggerFactory.getLogger;
110
111/**
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200112 * Provider which uses drivers to detect device and do initial handshake and
113 * channel establishment with devices. Any other provider specific operation is
114 * also delegated to the DeviceHandshaker driver.
Andrea Campanella241896c2017-05-10 13:11:04 -0700115 */
116@Beta
117@Component(immediate = true)
118public class GeneralDeviceProvider extends AbstractProvider
119 implements DeviceProvider {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200120
Andrea Campanella241896c2017-05-10 13:11:04 -0700121 private final Logger log = getLogger(getClass());
122
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700123 private static final String APP_NAME = "org.onosproject.gdp";
124 private static final String URI_SCHEME = "device";
125 private static final String CFG_SCHEME = "generalprovider";
126 private static final String DEVICE_PROVIDER_PACKAGE =
127 "org.onosproject.general.provider.device";
128 private static final int CORE_POOL_SIZE = 10;
129 private static final String UNKNOWN = "unknown";
130 private static final String DRIVER = "driver";
131 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS =
132 ImmutableSet.of("p4runtime");
133
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200135 private DeviceProviderRegistry providerRegistry;
Andrea Campanella241896c2017-05-10 13:11:04 -0700136
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200138 private ComponentConfigService componentConfigService;
Andrea Campanella4929a812017-10-09 18:38:23 +0200139
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200141 private NetworkConfigRegistry cfgService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700142
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200144 private CoreService coreService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700145
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200147 private DeviceService deviceService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700148
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200150 private DriverService driverService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700151
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 private MastershipService mastershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200154
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700156 private PiPipeconfService pipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700157
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700159 private PiPipeconfWatchdogService pipeconfWatchdogService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200160
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200161 private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
162 private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700163 //@Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
164 // label = "Configure poll frequency for port status and statistics; " +
165 // "default is 10 sec")
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200166 private int statsPollFrequency = DEFAULT_STATS_POLL_FREQUENCY;
Andrea Campanella19090322017-08-22 10:31:37 +0200167
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200168 private static final String PROBE_FREQUENCY = "deviceProbeFrequency";
169 private static final int DEFAULT_PROBE_FREQUENCY = 10;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700170 //@Property(name = PROBE_FREQUENCY, intValue = DEFAULT_PROBE_FREQUENCY,
171 // label = "Configure probe frequency for checking device availability; " +
172 // "default is 10 sec")
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200173 private int probeFrequency = DEFAULT_PROBE_FREQUENCY;
174
175 private static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
176 private static final int DEFAULT_OP_TIMEOUT_SHORT = 10;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700177 //@Property(name = OP_TIMEOUT_SHORT, intValue = DEFAULT_OP_TIMEOUT_SHORT,
178 // label = "Configure timeout in seconds for device operations " +
179 // "that are supposed to take a short time " +
180 // "(e.g. checking device reachability); default is 10 seconds")
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200181 private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
182
Andrea Campanellabc112a92017-06-26 19:06:43 +0200183 //FIXME to be removed when netcfg will issue device events in a bundle or
184 //ensures all configuration needed is present
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700185 private final Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
186 private final Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
187 private final Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700188
Carmelo Casconee5b28722018-06-22 17:28:28 +0200189 private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700190 private final ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
191 private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
192 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
193 private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200194 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
195 private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700196 private final ConfigFactory factory = new InternalConfigFactory();
197 private final Striped<Lock> deviceLocks = Striped.lock(30);
Andrea Campanella241896c2017-05-10 13:11:04 -0700198
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700199 private ExecutorService connectionExecutor;
200 private ScheduledExecutorService statsExecutor;
201 private ScheduledExecutorService probeExecutor;
202 private ScheduledFuture<?> probeTask;
203 private DeviceProviderService providerService;
204
205 public GeneralDeviceProvider() {
206 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
207 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700208
209 @Activate
Andrea Campanella1e573442018-05-17 17:07:13 +0200210 public void activate(ComponentContext context) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700211 connectionExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
212 "onos/gdp-connect", "%d", log));
213 statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
214 "onos/gdp-stats", "%d", log));
215 probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
216 "onos/gdp-probe", "%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700217 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200218 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700219 coreService.registerApplication(APP_NAME);
220 cfgService.registerConfigFactory(factory);
221 cfgService.addListener(cfgListener);
222 deviceService.addListener(deviceListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700223 pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
224 rescheduleProbeTask(false);
Andrea Campanella1e573442018-05-17 17:07:13 +0200225 modified(context);
Andrea Campanella241896c2017-05-10 13:11:04 -0700226 log.info("Started");
227 }
228
Andrea Campanella19090322017-08-22 10:31:37 +0200229 @Modified
230 public void modified(ComponentContext context) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200231 if (context == null) {
232 return;
Andrea Campanella19090322017-08-22 10:31:37 +0200233 }
234
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200235 Dictionary<?, ?> properties = context.getProperties();
236 final int oldStatsPollFrequency = statsPollFrequency;
237 statsPollFrequency = Tools.getIntegerProperty(
238 properties, STATS_POLL_FREQUENCY, DEFAULT_STATS_POLL_FREQUENCY);
239 log.info("Configured. {} is configured to {} seconds",
240 STATS_POLL_FREQUENCY, statsPollFrequency);
241 final int oldProbeFrequency = probeFrequency;
242 probeFrequency = Tools.getIntegerProperty(
243 properties, PROBE_FREQUENCY, DEFAULT_PROBE_FREQUENCY);
244 log.info("Configured. {} is configured to {} seconds",
245 PROBE_FREQUENCY, probeFrequency);
246 opTimeoutShort = Tools.getIntegerProperty(
247 properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
248 log.info("Configured. {} is configured to {} seconds",
249 OP_TIMEOUT_SHORT, opTimeoutShort);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200250
251 if (oldStatsPollFrequency != statsPollFrequency) {
252 rescheduleStatsPollingTasks();
Andrea Campanella19090322017-08-22 10:31:37 +0200253 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200254
255 if (oldProbeFrequency != probeFrequency) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700256 rescheduleProbeTask(true);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200257 }
258 }
259
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700260 private void rescheduleProbeTask(boolean deelay) {
261 synchronized (this) {
262 if (probeTask != null) {
263 probeTask.cancel(false);
264 }
265 probeTask = probeExecutor.scheduleAtFixedRate(
266 this::triggerProbeAllDevices,
267 deelay ? probeFrequency : 0,
268 probeFrequency,
269 TimeUnit.SECONDS);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200270 }
Andrea Campanella19090322017-08-22 10:31:37 +0200271 }
272
Andrea Campanella241896c2017-05-10 13:11:04 -0700273 @Deactivate
274 public void deactivate() {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700275 // Shutdown stats polling tasks.
276 statsPollingTasks.keySet().forEach(this::cancelStatsPolling);
277 statsPollingTasks.clear();
278 statsExecutor.shutdownNow();
279 try {
280 statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
281 } catch (InterruptedException e) {
282 log.warn("statsExecutor not terminated properly");
283 }
284 statsExecutor = null;
285 // Shutdown probe executor.
286 probeTask.cancel(true);
287 probeTask = null;
288 probeExecutor.shutdownNow();
289 try {
290 probeExecutor.awaitTermination(5, TimeUnit.SECONDS);
291 } catch (InterruptedException e) {
292 log.warn("probeExecutor not terminated properly");
293 }
294 probeExecutor = null;
295 // Shutdown connection executor.
296 connectionExecutor.shutdownNow();
297 try {
298 connectionExecutor.awaitTermination(5, TimeUnit.SECONDS);
299 } catch (InterruptedException e) {
300 log.warn("connectionExecutor not terminated properly");
301 }
302 connectionExecutor = null;
303 // Remove all device agent listeners
304 handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
305 handshakersWithListeners.clear();
306 // Other cleanup.
Andrea Campanella4929a812017-10-09 18:38:23 +0200307 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700308 cfgService.removeListener(cfgListener);
Andrea Campanella241896c2017-05-10 13:11:04 -0700309 deviceService.removeListener(deviceListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700310 pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
Andrea Campanella241896c2017-05-10 13:11:04 -0700311 providerRegistry.unregister(this);
312 providerService = null;
313 cfgService.unregisterConfigFactory(factory);
314 log.info("Stopped");
315 }
316
Andrea Campanella241896c2017-05-10 13:11:04 -0700317
318 @Override
319 public void triggerProbe(DeviceId deviceId) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200320 connectionExecutor.execute(withDeviceLock(
321 () -> doDeviceProbe(deviceId), deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700322 }
323
324 @Override
325 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700326 log.info("Notifying role {} to device {}", newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200327 requestedRoles.put(deviceId, newRole);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200328 connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200329 }
330
331 private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700332 final DeviceHandshaker handshaker = getBehaviour(
333 deviceId, DeviceHandshaker.class);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200334 if (handshaker == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200335 log.error("Null handshaker. Unable to notify new role {} to {}",
336 newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200337 return;
338 }
339 handshaker.roleChanged(newRole);
Andrea Campanella241896c2017-05-10 13:11:04 -0700340 }
341
342 @Override
343 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200344 log.debug("Testing reachability for device {}", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700345 final DeviceHandshaker handshaker = getBehaviour(
346 deviceId, DeviceHandshaker.class);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100347 if (handshaker == null) {
348 return false;
349 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200350 return getFutureWithDeadline(
351 handshaker.isReachable(), "checking reachability",
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200352 deviceId, false, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700353 }
354
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700355 private boolean isConnected(DeviceId deviceId) {
356 log.debug("Testing connection to device {}", deviceId);
357 final DeviceHandshaker handshaker = getBehaviour(
358 deviceId, DeviceHandshaker.class);
359 if (handshaker == null) {
360 return false;
361 }
362 return handshaker.isConnected();
363 }
364
Andrea Campanella241896c2017-05-10 13:11:04 -0700365 @Override
366 public void changePortState(DeviceId deviceId, PortNumber portNumber,
367 boolean enable) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200368 connectionExecutor.execute(
369 () -> doChangePortState(deviceId, portNumber, enable));
370 }
371
372 private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
373 boolean enable) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200374 if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
375 log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
376 deviceId);
377 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700378 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200379 final PortAdmin portAdmin = deviceService.getDevice(deviceId)
380 .as(PortAdmin.class);
381 final CompletableFuture<Boolean> modifyTask = enable
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200382 ? portAdmin.enable(portNumber)
383 : portAdmin.disable(portNumber);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200384 final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
385 getFutureWithDeadline(
386 modifyTask, descr, deviceId, null, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700387 }
388
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700389 @Override
390 public void triggerDisconnect(DeviceId deviceId) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200391 log.debug("Triggering disconnection of device {}", deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200392 connectionExecutor.execute(withDeviceLock(
393 () -> doDisconnectDevice(deviceId), deviceId));
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700394 }
395
Andrea Campanella241896c2017-05-10 13:11:04 -0700396 private Driver getDriver(DeviceId deviceId) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700397 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200398 // DriverManager checks first using basic device config.
399 return driverService.getDriver(deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700400 } catch (ItemNotFoundException e) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200401 log.error("Driver not found for {}", deviceId);
402 return null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700403 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700404 }
405
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700406 private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
407 // Get handshaker.
408
409 Driver driver = getDriver(deviceId);
410 if (driver == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700411 return null;
412 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700413 if (!driver.hasBehaviour(type)) {
414 return null;
415 }
416 final DriverData data = new DefaultDriverData(driver, deviceId);
417 // Storing deviceKeyId and all other config values as data in the driver
418 // with protocol_<info> name as the key. e.g protocol_ip.
419 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
420 deviceId, GeneralProviderDeviceConfig.class);
421 if (providerConfig != null) {
422 providerConfig.protocolsInfo().forEach((protocol, info) -> {
423 info.configValues().forEach(
424 (k, v) -> data.set(protocol + "_" + k, v));
425 data.set(protocol + "_key", info.deviceKeyId());
426 });
427 }
428 final DefaultDriverHandler handler = new DefaultDriverHandler(data);
429 return driver.createBehaviour(handler, type);
Andrea Campanella241896c2017-05-10 13:11:04 -0700430 }
431
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200432 private void doConnectDevice(DeviceId deviceId) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700433 log.debug("Initiating connection to device {}...", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200434 // Retrieve config
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700435 if (configIsMissing(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200436 return;
437 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700438 // Bind pipeconf (if any and if device is capable).
439 if (!bindPipeconfIfRequired(deviceId)) {
440 // We already logged the error.
441 return;
442 }
443 // Get handshaker.
444 final DeviceHandshaker handshaker = getBehaviour(
445 deviceId, DeviceHandshaker.class);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200446 if (handshaker == null) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700447 log.error("Missing handshaker behavior for {}, aborting connection",
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200448 deviceId);
449 return;
450 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700451 // Add device agent listener.
452 handshaker.addDeviceAgentListener(id(), deviceAgentListener);
453 handshakersWithListeners.put(deviceId, handshaker);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200454 // Start connection via handshaker.
455 final Boolean connectSuccess = getFutureWithDeadline(
456 handshaker.connect(), "initiating connection",
Carmelo Casconede3b6842018-09-05 17:45:10 -0700457 deviceId, false, opTimeoutShort);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700458 if (!connectSuccess) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200459 log.warn("Unable to connect to {}", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700460 }
461 }
462
463 private void triggerAdvertiseDevice(DeviceId deviceId) {
464 connectionExecutor.execute(withDeviceLock(
465 () -> doAdvertiseDevice(deviceId), deviceId));
466 }
467
468 private void doAdvertiseDevice(DeviceId deviceId) {
469 // Retrieve config
470 if (configIsMissing(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200471 return;
472 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700473 // Obtain device and port description.
474 final boolean isPipelineReady = isPipelineReady(deviceId);
475 final DeviceDescription description = getDeviceDescription(
476 deviceId, isPipelineReady);
477 final List<PortDescription> ports = getPortDetails(deviceId);
478 // Advertise to core.
479 if (deviceService.getDevice(deviceId) == null ||
480 (description.isDefaultAvailable() &&
481 !deviceService.isAvailable(deviceId))) {
482 if (!isPipelineReady) {
483 log.info("Advertising device to core with available={} as " +
484 "device pipeline is not ready yet",
485 description.isDefaultAvailable());
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400486 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700487 providerService.deviceConnected(deviceId, description);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200488 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700489 providerService.updatePorts(deviceId, ports);
490 // If pipeline is not ready, encourage watchdog to perform probe ASAP.
491 if (!isPipelineReady) {
492 pipeconfWatchdogService.triggerProbe(deviceId);
493 }
494 }
495
496 private DeviceDescription getDeviceDescription(
497 DeviceId deviceId, boolean defaultAvailable) {
498 // Get one from driver or forge.
499 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
500 deviceId, DeviceDescriptionDiscovery.class);
501 if (deviceDiscovery != null) {
502 // Enforce defaultAvailable flag over the one obtained from driver.
503 final DeviceDescription d = deviceDiscovery.discoverDeviceDetails();
504 return new DefaultDeviceDescription(d, defaultAvailable, d.annotations());
505 } else {
506 return forgeDeviceDescription(deviceId, defaultAvailable);
507 }
508 }
509
510 private List<PortDescription> getPortDetails(DeviceId deviceId) {
511 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
512 deviceId, DeviceDescriptionDiscovery.class);
513 if (deviceDiscovery != null) {
514 return deviceDiscovery.discoverPortDetails();
515 } else {
516 return Collections.emptyList();
517 }
518 }
519
520 private DeviceDescription forgeDeviceDescription(
521 DeviceId deviceId, boolean defaultAvailable) {
522 // Uses handshaker and provider config to get driver data.
523 final DeviceHandshaker handshaker = getBehaviour(
524 deviceId, DeviceHandshaker.class);
525 final Driver driver = handshaker != null
526 ? handshaker.handler().driver() : null;
527 final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
528 deviceId, GeneralProviderDeviceConfig.class);
529 final DefaultAnnotations.Builder annBuilder = DefaultAnnotations.builder();
530 // If device is pipeline programmable, let this provider decide when the
531 // device can be marked online.
532 annBuilder.set(AnnotationKeys.PROVIDER_MARK_ONLINE,
533 String.valueOf(isPipelineProgrammable(deviceId)));
534 if (cfg != null) {
535 StringJoiner protoStringBuilder = new StringJoiner(", ");
536 cfg.protocolsInfo().keySet().forEach(protoStringBuilder::add);
537 annBuilder.set(AnnotationKeys.PROTOCOL, protoStringBuilder.toString());
538 }
539 return new DefaultDeviceDescription(
540 deviceId.uri(),
541 Device.Type.SWITCH,
542 driver != null ? driver.manufacturer() : UNKNOWN,
543 driver != null ? driver.hwVersion() : UNKNOWN,
544 driver != null ? driver.swVersion() : UNKNOWN,
545 UNKNOWN,
546 new ChassisId(),
547 defaultAvailable,
548 annBuilder.build());
549 }
550
551 private void triggerMarkAvailable(DeviceId deviceId) {
552 connectionExecutor.execute(withDeviceLock(
553 () -> doMarkAvailable(deviceId), deviceId));
554 }
555
556 private void doMarkAvailable(DeviceId deviceId) {
557 if (deviceService.isAvailable(deviceId)) {
558 return;
559 }
560 final DeviceDescription descr = getDeviceDescription(deviceId, true);
561 // It has been observed that devices that were marked offline (e.g.
562 // after device disconnection) might end up with no master. Here we
563 // trigger a new master election (if device has no master).
564 mastershipService.requestRoleForSync(deviceId);
565 providerService.deviceConnected(deviceId, descr);
566 }
567
568 private boolean bindPipeconfIfRequired(DeviceId deviceId) {
569 if (pipeconfService.ofDevice(deviceId).isPresent()
570 || !isPipelineProgrammable(deviceId)) {
571 // Nothing to do, all good.
572 return true;
573 }
574 // Get pipeconf from netcfg or driver (default one).
575 final PiPipelineProgrammable pipelineProg = getBehaviour(
576 deviceId, PiPipelineProgrammable.class);
577 final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
578 if (pipeconfId == null) {
579 return false;
580 }
581 // Store binding in pipeconf service.
582 pipeconfService.bindToDevice(pipeconfId, deviceId);
583 return true;
584 }
585
586 private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
587 // Places to look for a pipeconf ID (in priority order)):
588 // 1) netcfg
589 // 2) device driver (default one)
590 final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
591 if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
592 return pipeconfId;
593 }
594 if (pipelineProg != null
595 && pipelineProg.getDefaultPipeconf().isPresent()) {
596 final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
597 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
598 return defaultPipeconf.id();
599 } else {
600 log.warn("Unable to associate a pipeconf to {}", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200601 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400602 }
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400603 }
604
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200605 private void doDisconnectDevice(DeviceId deviceId) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200606 log.debug("Initiating disconnection from {}...", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700607 final DeviceHandshaker handshaker = getBehaviour(
608 deviceId, DeviceHandshaker.class);
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200609 final boolean isAvailable = deviceService.isAvailable(deviceId);
610 // Signal disconnection to core (if master).
611 if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200612 providerService.deviceDisconnected(deviceId);
613 }
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200614 // Cancel tasks.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200615 cancelStatsPolling(deviceId);
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200616 // Disconnect device.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200617 if (handshaker == null) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200618 if (isAvailable) {
619 // If not available don't bother logging. We are probably
620 // invoking this method multiple times for the same device.
621 log.warn("Missing DeviceHandshaker behavior for {}, " +
622 "no guarantees of complete disconnection",
623 deviceId);
624 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200625 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700626 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700627 handshaker.removeDeviceAgentListener(id());
628 handshakersWithListeners.remove(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200629 final boolean disconnectSuccess = getFutureWithDeadline(
630 handshaker.disconnect(), "performing disconnection",
631 deviceId, false, opTimeoutShort);
632 if (!disconnectSuccess) {
633 log.warn("Unable to disconnect from {}", deviceId);
634 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700635 }
636
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200637 // Needed to catch the exception in the executors since are not rethrown otherwise.
Andrea Campanella241896c2017-05-10 13:11:04 -0700638 private Runnable exceptionSafe(Runnable runnable) {
639 return () -> {
640 try {
641 runnable.run();
642 } catch (Exception e) {
643 log.error("Unhandled Exception", e);
644 }
645 };
646 }
647
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200648 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
649 final Lock lock = deviceLocks.get(deviceId);
650 lock.lock();
651 try {
652 return task.get();
653 } finally {
654 lock.unlock();
655 }
656 }
657
658 private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
659 // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
660 return () -> withDeviceLock(() -> {
661 task.run();
662 return null;
663 }, deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200664 }
665
Andrea Campanella241896c2017-05-10 13:11:04 -0700666 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900667 Device device = deviceService.getDevice(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200668 if (device != null && deviceService.isAvailable(deviceId) &&
Andrea Campanellace111932017-09-18 16:59:56 +0900669 device.is(PortStatisticsDiscovery.class)) {
670 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
671 .discoverPortStatistics();
672 //updating statistcs only if not empty
673 if (!statistics.isEmpty()) {
674 providerService.updatePortStatistics(deviceId, statistics);
675 }
676 } else {
677 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200678 }
679 }
680
Carmelo Casconee5b28722018-06-22 17:28:28 +0200681 private boolean notMyScheme(DeviceId deviceId) {
682 return !deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700683 }
684
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200685 private void triggerConnect(DeviceId deviceId) {
686 connectionExecutor.execute(withDeviceLock(
687 () -> doConnectDevice(deviceId), deviceId));
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200688 }
689
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700690 private boolean isPipelineProgrammable(DeviceId deviceId) {
691 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
692 deviceId, GeneralProviderDeviceConfig.class);
693 if (providerConfig == null) {
694 return false;
695 }
696 return !Collections.disjoint(
697 ImmutableSet.copyOf(providerConfig.node().fieldNames()),
698 PIPELINE_CONFIGURABLE_PROTOCOLS);
699 }
700
Andrea Campanella241896c2017-05-10 13:11:04 -0700701 /**
702 * Listener for configuration events.
703 */
704 private class InternalNetworkConfigListener implements NetworkConfigListener {
705
Andrea Campanella241896c2017-05-10 13:11:04 -0700706 @Override
707 public void event(NetworkConfigEvent event) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200708 connectionExecutor.execute(() -> consumeConfigEvent(event));
709 }
710
711 @Override
712 public boolean isRelevant(NetworkConfigEvent event) {
713 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
714 event.configClass().equals(BasicDeviceConfig.class) ||
715 event.configClass().equals(PiPipeconfConfig.class)) &&
716 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
717 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
718 }
719
720 private void consumeConfigEvent(NetworkConfigEvent event) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700721 DeviceId deviceId = (DeviceId) event.subject();
722 //Assuming that the deviceId comes with uri 'device:'
Carmelo Casconee5b28722018-06-22 17:28:28 +0200723 if (notMyScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700724 // not under my scheme, skipping
725 log.debug("{} is not my scheme, skipping", deviceId);
726 return;
727 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200728 final boolean configComplete = withDeviceLock(
729 () -> isDeviceConfigComplete(event, deviceId), deviceId);
730 if (!configComplete) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200731 // Still waiting for some configuration.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200732 return;
733 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200734 // Good to go.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200735 triggerConnect(deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200736 cleanUpConfigInfo(deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200737 }
738
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200739 private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
740 // FIXME to be removed when netcfg will issue device events in a bundle or
Andrea Campanellabc112a92017-06-26 19:06:43 +0200741 // ensure all configuration needed is present
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200742 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
743 //FIXME we currently assume that p4runtime devices are pipeline configurable.
744 //If we want to connect a p4runtime device with no pipeline
745 if (event.config().isPresent()) {
746 deviceConfigured.add(deviceId);
747 final boolean isNotPipelineConfigurable = Collections.disjoint(
748 ImmutableSet.copyOf(event.config().get().node().fieldNames()),
749 PIPELINE_CONFIGURABLE_PROTOCOLS);
750 if (isNotPipelineConfigurable) {
751 // Skip waiting for a pipeline if we can't support it.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200752 pipelineConfigured.add(deviceId);
753 }
754 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200755 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
756 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
757 driverConfigured.add(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200758 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200759 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
760 if (event.config().isPresent()
761 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
762 pipelineConfigured.add(deviceId);
763 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700764 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200765
766 if (deviceConfigured.contains(deviceId)
767 && driverConfigured.contains(deviceId)
768 && pipelineConfigured.contains(deviceId)) {
769 return true;
770 } else {
771 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
772 log.debug("Waiting for pipeline configuration for device {}", deviceId);
773 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
774 log.debug("Waiting for device configuration for device {}", deviceId);
775 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
776 log.debug("Waiting for driver configuration for device {}", deviceId);
777 } else if (driverConfigured.contains(deviceId)) {
778 log.debug("Only driver configuration for device {}", deviceId);
779 } else if (deviceConfigured.contains(deviceId)) {
780 log.debug("Only device configuration for device {}", deviceId);
781 }
782 }
783 return false;
Andrea Campanella241896c2017-05-10 13:11:04 -0700784 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700785 }
786
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700787 private boolean isPipelineReady(DeviceId deviceId) {
788 final boolean isPipelineProg = isPipelineProgrammable(deviceId);
789 final boolean isPipeconfReady = pipeconfWatchdogService
790 .getStatus(deviceId)
791 .equals(PiPipeconfWatchdogService.PipelineStatus.READY);
792 return !isPipelineProg || isPipeconfReady;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200793 }
794
795 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200796 deviceConfigured.remove(deviceId);
797 driverConfigured.remove(deviceId);
798 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200799 }
800
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200801 private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
802 statsPollingTasks.compute(deviceId, (did, oldTask) -> {
803 if (oldTask != null) {
804 oldTask.cancel(false);
805 }
806 final int delay = withRandomDelay
807 ? new SecureRandom().nextInt(10) : 0;
808 return statsExecutor.scheduleAtFixedRate(
809 exceptionSafe(() -> updatePortStatistics(deviceId)),
810 delay, statsPollFrequency, TimeUnit.SECONDS);
811 });
Andrea Campanella19090322017-08-22 10:31:37 +0200812 }
813
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200814 private void cancelStatsPolling(DeviceId deviceId) {
815 statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
816 task.cancel(false);
817 return null;
818 });
819 }
820
821 private void rescheduleStatsPollingTasks() {
822 statsPollingTasks.keySet().forEach(deviceId -> {
823 // startStatsPolling cancels old one if present.
824 startStatsPolling(deviceId, true);
825 });
826 }
827
828 private void triggerProbeAllDevices() {
829 // Async trigger a task for all devices in the cfg.
830 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700831 .forEach(this::triggerDeviceProbe);
Andrea Campanella1e573442018-05-17 17:07:13 +0200832 }
833
Carmelo Cascone92044522018-06-29 19:00:59 +0200834 private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
835 PiPipeconfConfig config = cfgService.getConfig(
836 deviceId, PiPipeconfConfig.class);
837 if (config == null) {
838 return null;
839 }
840 return config.piPipeconfId();
841 }
842
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700843 private void triggerDeviceProbe(DeviceId deviceId) {
844 connectionExecutor.execute(withDeviceLock(
845 () -> doDeviceProbe(deviceId), deviceId));
846 }
847
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200848 private void doDeviceProbe(DeviceId deviceId) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700849 log.debug("Probing device {}...", deviceId);
850 if (configIsMissing(deviceId)) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200851 return;
852 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700853 if (!isConnected(deviceId)) {
854 if (deviceService.isAvailable(deviceId)) {
855 providerService.deviceDisconnected(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200856 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200857 triggerConnect(deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200858 }
859 }
860
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700861 private boolean configIsMissing(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200862 final boolean present =
863 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
864 && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
Andrea Campanella1e573442018-05-17 17:07:13 +0200865 if (!present) {
866 log.warn("Configuration for device {} is not complete", deviceId);
867 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700868 return !present;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200869 }
870
871 private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700872 // Notify core about mastership response.
873 final MastershipRole request = requestedRoles.get(deviceId);
874 final boolean isAvailable = deviceService.isAvailable(deviceId);
875 if (request == null || !isAvailable) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200876 return;
877 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700878 log.debug("Device {} asserted role {} (requested was {})",
879 deviceId, response, request);
880 providerService.receivedRoleReply(deviceId, request, response);
881 // FIXME: this should be based on assigned mastership, not what returned by device
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200882 if (response.equals(MastershipRole.MASTER)) {
883 startStatsPolling(deviceId, false);
884 } else {
885 cancelStatsPolling(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200886 }
887 }
888
Carmelo Casconede3b6842018-09-05 17:45:10 -0700889 private void handleNotMaster(DeviceId deviceId) {
890 log.warn("Device {} notified that this node is not master, " +
891 "relinquishing mastership...", deviceId);
892 mastershipService.relinquishMastership(deviceId);
893 }
894
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200895 private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200896 DeviceId deviceId, U defaultValue, int timeout) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200897 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200898 return future.get(timeout, TimeUnit.SECONDS);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200899 } catch (InterruptedException e) {
900 log.error("Thread interrupted while {} on {}", opDescription, deviceId);
901 Thread.currentThread().interrupt();
902 } catch (ExecutionException e) {
903 log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
904 } catch (TimeoutException e) {
905 log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
906 }
907 return defaultValue;
908 }
909
Andrea Campanella241896c2017-05-10 13:11:04 -0700910 /**
911 * Listener for core device events.
912 */
913 private class InternalDeviceListener implements DeviceListener {
914 @Override
915 public void event(DeviceEvent event) {
Andrea Campanellace111932017-09-18 16:59:56 +0900916 DeviceId deviceId = event.subject().id();
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700917 // For now this is scheduled periodically, when streaming API will
918 // be available we check and base it on the streaming API (e.g. gNMI)
919 if (mastershipService.isLocalMaster(deviceId)) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200920 startStatsPolling(deviceId, true);
Andrea Campanella241896c2017-05-10 13:11:04 -0700921 }
922 }
923
924 @Override
925 public boolean isRelevant(DeviceEvent event) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700926 return event.type() == Type.DEVICE_ADDED &&
927 event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700928 }
929 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200930
931 /**
Carmelo Casconee5b28722018-06-22 17:28:28 +0200932 * Listener for device agent events.
Andrea Campanella1e573442018-05-17 17:07:13 +0200933 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200934 private class InternalDeviceAgentListener implements DeviceAgentListener {
Andrea Campanella1e573442018-05-17 17:07:13 +0200935
936 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200937 public void event(DeviceAgentEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200938 DeviceId deviceId = event.subject();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200939 switch (event.type()) {
940 case CHANNEL_OPEN:
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700941 triggerAdvertiseDevice(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200942 break;
943 case CHANNEL_CLOSED:
Carmelo Casconee5b28722018-06-22 17:28:28 +0200944 case CHANNEL_ERROR:
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700945 triggerDeviceProbe(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200946 break;
947 case ROLE_MASTER:
948 handleMastershipResponse(deviceId, MastershipRole.MASTER);
949 break;
950 case ROLE_STANDBY:
951 handleMastershipResponse(deviceId, MastershipRole.STANDBY);
952 break;
953 case ROLE_NONE:
954 handleMastershipResponse(deviceId, MastershipRole.NONE);
955 break;
Carmelo Casconede3b6842018-09-05 17:45:10 -0700956 case NOT_MASTER:
957 handleNotMaster(deviceId);
958 break;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200959 default:
960 log.warn("Unrecognized device agent event {}", event.type());
Andrea Campanella1e573442018-05-17 17:07:13 +0200961 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200962 }
963
964 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700965
966 private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
967 @Override
968 public void event(PiPipeconfWatchdogEvent event) {
969 triggerMarkAvailable(event.subject());
970 }
971
972 @Override
973 public boolean isRelevant(PiPipeconfWatchdogEvent event) {
974 return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_READY);
975 }
976 }
977
978 private class InternalConfigFactory
979 extends ConfigFactory<DeviceId, GeneralProviderDeviceConfig> {
980
981 InternalConfigFactory() {
982 super(SubjectFactories.DEVICE_SUBJECT_FACTORY,
983 GeneralProviderDeviceConfig.class, CFG_SCHEME);
984 }
985
986 @Override
987 public GeneralProviderDeviceConfig createConfig() {
988 return new GeneralProviderDeviceConfig();
989 }
990 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700991}