blob: 6e8fe1df5d1affb066664aa06b346a87b1bd02b2 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020022import com.google.common.util.concurrent.Striped;
Andrea Campanella241896c2017-05-10 13:11:04 -070023import org.onlab.packet.ChassisId;
24import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020026import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020028import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070029import org.onosproject.net.AnnotationKeys;
30import org.onosproject.net.DefaultAnnotations;
31import org.onosproject.net.Device;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.MastershipRole;
34import org.onosproject.net.PortNumber;
Carmelo Cascone87892e22017-11-13 16:01:29 -080035import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.net.behaviour.PortAdmin;
37import org.onosproject.net.config.ConfigFactory;
38import org.onosproject.net.config.NetworkConfigEvent;
39import org.onosproject.net.config.NetworkConfigListener;
40import org.onosproject.net.config.NetworkConfigRegistry;
41import org.onosproject.net.config.basics.BasicDeviceConfig;
42import org.onosproject.net.config.basics.SubjectFactories;
43import org.onosproject.net.device.DefaultDeviceDescription;
Carmelo Casconee5b28722018-06-22 17:28:28 +020044import org.onosproject.net.device.DeviceAgentEvent;
45import org.onosproject.net.device.DeviceAgentListener;
Andrea Campanella241896c2017-05-10 13:11:04 -070046import org.onosproject.net.device.DeviceDescription;
47import org.onosproject.net.device.DeviceDescriptionDiscovery;
48import org.onosproject.net.device.DeviceEvent;
49import org.onosproject.net.device.DeviceHandshaker;
50import org.onosproject.net.device.DeviceListener;
51import org.onosproject.net.device.DeviceProvider;
52import org.onosproject.net.device.DeviceProviderRegistry;
53import org.onosproject.net.device.DeviceProviderService;
54import org.onosproject.net.device.DeviceService;
55import org.onosproject.net.device.PortDescription;
56import org.onosproject.net.device.PortStatistics;
57import org.onosproject.net.device.PortStatisticsDiscovery;
58import org.onosproject.net.driver.Behaviour;
59import org.onosproject.net.driver.DefaultDriverData;
60import org.onosproject.net.driver.DefaultDriverHandler;
61import org.onosproject.net.driver.Driver;
62import org.onosproject.net.driver.DriverData;
63import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040064import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020065import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080066import org.onosproject.net.pi.service.PiPipeconfConfig;
67import org.onosproject.net.pi.service.PiPipeconfService;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070068import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
69import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
70import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
Andrea Campanella241896c2017-05-10 13:11:04 -070071import org.onosproject.net.provider.AbstractProvider;
72import org.onosproject.net.provider.ProviderId;
73import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020074import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075import org.osgi.service.component.annotations.Activate;
76import org.osgi.service.component.annotations.Component;
77import org.osgi.service.component.annotations.Deactivate;
78import org.osgi.service.component.annotations.Modified;
79import org.osgi.service.component.annotations.Reference;
80import org.osgi.service.component.annotations.ReferenceCardinality;
Andrea Campanella241896c2017-05-10 13:11:04 -070081import org.slf4j.Logger;
82
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080083import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070084import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020085import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020086import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070087import java.util.List;
Thomas Vachuska5b38dc02018-05-10 15:24:40 -070088import java.util.Map;
Andrea Campanellabc112a92017-06-26 19:06:43 +020089import java.util.Set;
Carmelo Cascone9e4972c2018-08-30 00:29:16 -070090import java.util.StringJoiner;
Andrea Campanella241896c2017-05-10 13:11:04 -070091import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020092import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020093import java.util.concurrent.ConcurrentMap;
94import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070095import java.util.concurrent.ExecutionException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020096import java.util.concurrent.ExecutorService;
Andrea Campanella241896c2017-05-10 13:11:04 -070097import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020098import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070099import java.util.concurrent.TimeUnit;
100import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200101import java.util.concurrent.locks.Lock;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200102import java.util.function.Supplier;
Andrea Campanella241896c2017-05-10 13:11:04 -0700103
Carmelo Casconee5b28722018-06-22 17:28:28 +0200104import static java.util.concurrent.Executors.newFixedThreadPool;
Andrea Campanella241896c2017-05-10 13:11:04 -0700105import static java.util.concurrent.Executors.newScheduledThreadPool;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200106import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Andrea Campanella241896c2017-05-10 13:11:04 -0700107import static org.onlab.util.Tools.groupedThreads;
108import static org.onosproject.net.device.DeviceEvent.Type;
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700109import static org.onosproject.provider.general.device.impl.OsgiPropertyDefaults.OP_TIMEOUT_SHORT_DEFAULT;
110import static org.onosproject.provider.general.device.impl.OsgiPropertyDefaults.PROBE_FREQUENCY_DEFAULT;
111import static org.onosproject.provider.general.device.impl.OsgiPropertyDefaults.STATS_POLL_FREQUENCY_DEFAULT;
Andrea Campanella241896c2017-05-10 13:11:04 -0700112import static org.slf4j.LoggerFactory.getLogger;
113
114/**
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200115 * Provider which uses drivers to detect device and do initial handshake and
116 * channel establishment with devices. Any other provider specific operation is
117 * also delegated to the DeviceHandshaker driver.
Andrea Campanella241896c2017-05-10 13:11:04 -0700118 */
119@Beta
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700120@Component(immediate = true,
121 property = {
122 "deviceStatsPollFrequency:Integer=" + STATS_POLL_FREQUENCY_DEFAULT,
123 "deviceProbeFrequency:Integer=" + PROBE_FREQUENCY_DEFAULT,
124 "deviceOperationTimeoutShort:Integer=" + OP_TIMEOUT_SHORT_DEFAULT,
125 })
Andrea Campanella241896c2017-05-10 13:11:04 -0700126public class GeneralDeviceProvider extends AbstractProvider
127 implements DeviceProvider {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200128
Andrea Campanella241896c2017-05-10 13:11:04 -0700129 private final Logger log = getLogger(getClass());
130
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700131 private static final String APP_NAME = "org.onosproject.gdp";
132 private static final String URI_SCHEME = "device";
133 private static final String CFG_SCHEME = "generalprovider";
134 private static final String DEVICE_PROVIDER_PACKAGE =
135 "org.onosproject.general.provider.device";
136 private static final int CORE_POOL_SIZE = 10;
137 private static final String UNKNOWN = "unknown";
138 private static final String DRIVER = "driver";
139 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS =
140 ImmutableSet.of("p4runtime");
141
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200143 private DeviceProviderRegistry providerRegistry;
Andrea Campanella241896c2017-05-10 13:11:04 -0700144
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200146 private ComponentConfigService componentConfigService;
Andrea Campanella4929a812017-10-09 18:38:23 +0200147
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700148 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200149 private NetworkConfigRegistry cfgService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700150
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700151 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200152 private CoreService coreService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700153
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700154 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200155 private DeviceService deviceService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700156
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700157 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200158 private DriverService driverService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700159
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200161 private MastershipService mastershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200162
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700164 private PiPipeconfService pipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700165
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700167 private PiPipeconfWatchdogService pipeconfWatchdogService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200168
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200169 private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700170 //@Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
171 // label = "Configure poll frequency for port status and statistics; " +
172 // "default is 10 sec")
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700173 private int statsPollFrequency = STATS_POLL_FREQUENCY_DEFAULT;
Andrea Campanella19090322017-08-22 10:31:37 +0200174
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200175 private static final String PROBE_FREQUENCY = "deviceProbeFrequency";
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700176 //@Property(name = PROBE_FREQUENCY, intValue = DEFAULT_PROBE_FREQUENCY,
177 // label = "Configure probe frequency for checking device availability; " +
178 // "default is 10 sec")
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700179 private int probeFrequency = PROBE_FREQUENCY_DEFAULT;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200180
181 private static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700182 //@Property(name = OP_TIMEOUT_SHORT, intValue = DEFAULT_OP_TIMEOUT_SHORT,
183 // label = "Configure timeout in seconds for device operations " +
184 // "that are supposed to take a short time " +
185 // "(e.g. checking device reachability); default is 10 seconds")
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700186 private int opTimeoutShort = OP_TIMEOUT_SHORT_DEFAULT;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200187
Andrea Campanellabc112a92017-06-26 19:06:43 +0200188 //FIXME to be removed when netcfg will issue device events in a bundle or
189 //ensures all configuration needed is present
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700190 private final Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
191 private final Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
192 private final Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700193
Carmelo Casconee5b28722018-06-22 17:28:28 +0200194 private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700195 private final ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
196 private final Map<DeviceId, DeviceHandshaker> handshakersWithListeners = Maps.newConcurrentMap();
197 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
198 private final InternalPipeconfWatchdogListener pipeconfWatchdogListener = new InternalPipeconfWatchdogListener();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200199 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
200 private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700201 private final ConfigFactory factory = new InternalConfigFactory();
202 private final Striped<Lock> deviceLocks = Striped.lock(30);
Andrea Campanella241896c2017-05-10 13:11:04 -0700203
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700204 private ExecutorService connectionExecutor;
205 private ScheduledExecutorService statsExecutor;
206 private ScheduledExecutorService probeExecutor;
207 private ScheduledFuture<?> probeTask;
208 private DeviceProviderService providerService;
209
210 public GeneralDeviceProvider() {
211 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
212 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700213
214 @Activate
Andrea Campanella1e573442018-05-17 17:07:13 +0200215 public void activate(ComponentContext context) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700216 connectionExecutor = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
217 "onos/gdp-connect", "%d", log));
218 statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
219 "onos/gdp-stats", "%d", log));
220 probeExecutor = newSingleThreadScheduledExecutor(groupedThreads(
221 "onos/gdp-probe", "%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700222 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200223 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700224 coreService.registerApplication(APP_NAME);
225 cfgService.registerConfigFactory(factory);
226 cfgService.addListener(cfgListener);
227 deviceService.addListener(deviceListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700228 pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
229 rescheduleProbeTask(false);
Andrea Campanella1e573442018-05-17 17:07:13 +0200230 modified(context);
Andrea Campanella241896c2017-05-10 13:11:04 -0700231 log.info("Started");
232 }
233
Andrea Campanella19090322017-08-22 10:31:37 +0200234 @Modified
235 public void modified(ComponentContext context) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200236 if (context == null) {
237 return;
Andrea Campanella19090322017-08-22 10:31:37 +0200238 }
239
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200240 Dictionary<?, ?> properties = context.getProperties();
241 final int oldStatsPollFrequency = statsPollFrequency;
242 statsPollFrequency = Tools.getIntegerProperty(
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700243 properties, STATS_POLL_FREQUENCY, STATS_POLL_FREQUENCY_DEFAULT);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200244 log.info("Configured. {} is configured to {} seconds",
245 STATS_POLL_FREQUENCY, statsPollFrequency);
246 final int oldProbeFrequency = probeFrequency;
247 probeFrequency = Tools.getIntegerProperty(
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700248 properties, PROBE_FREQUENCY, PROBE_FREQUENCY_DEFAULT);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200249 log.info("Configured. {} is configured to {} seconds",
250 PROBE_FREQUENCY, probeFrequency);
251 opTimeoutShort = Tools.getIntegerProperty(
Thomas Vachuska4167c3f2018-10-16 07:16:31 -0700252 properties, OP_TIMEOUT_SHORT, OP_TIMEOUT_SHORT_DEFAULT);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200253 log.info("Configured. {} is configured to {} seconds",
254 OP_TIMEOUT_SHORT, opTimeoutShort);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200255
256 if (oldStatsPollFrequency != statsPollFrequency) {
257 rescheduleStatsPollingTasks();
Andrea Campanella19090322017-08-22 10:31:37 +0200258 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200259
260 if (oldProbeFrequency != probeFrequency) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700261 rescheduleProbeTask(true);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200262 }
263 }
264
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700265 private void rescheduleProbeTask(boolean deelay) {
266 synchronized (this) {
267 if (probeTask != null) {
268 probeTask.cancel(false);
269 }
270 probeTask = probeExecutor.scheduleAtFixedRate(
271 this::triggerProbeAllDevices,
272 deelay ? probeFrequency : 0,
273 probeFrequency,
274 TimeUnit.SECONDS);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200275 }
Andrea Campanella19090322017-08-22 10:31:37 +0200276 }
277
Andrea Campanella241896c2017-05-10 13:11:04 -0700278 @Deactivate
279 public void deactivate() {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700280 // Shutdown stats polling tasks.
281 statsPollingTasks.keySet().forEach(this::cancelStatsPolling);
282 statsPollingTasks.clear();
283 statsExecutor.shutdownNow();
284 try {
285 statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
286 } catch (InterruptedException e) {
287 log.warn("statsExecutor not terminated properly");
288 }
289 statsExecutor = null;
290 // Shutdown probe executor.
291 probeTask.cancel(true);
292 probeTask = null;
293 probeExecutor.shutdownNow();
294 try {
295 probeExecutor.awaitTermination(5, TimeUnit.SECONDS);
296 } catch (InterruptedException e) {
297 log.warn("probeExecutor not terminated properly");
298 }
299 probeExecutor = null;
300 // Shutdown connection executor.
301 connectionExecutor.shutdownNow();
302 try {
303 connectionExecutor.awaitTermination(5, TimeUnit.SECONDS);
304 } catch (InterruptedException e) {
305 log.warn("connectionExecutor not terminated properly");
306 }
307 connectionExecutor = null;
308 // Remove all device agent listeners
309 handshakersWithListeners.values().forEach(h -> h.removeDeviceAgentListener(id()));
310 handshakersWithListeners.clear();
311 // Other cleanup.
Andrea Campanella4929a812017-10-09 18:38:23 +0200312 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700313 cfgService.removeListener(cfgListener);
Andrea Campanella241896c2017-05-10 13:11:04 -0700314 deviceService.removeListener(deviceListener);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700315 pipeconfWatchdogService.removeListener(pipeconfWatchdogListener);
Andrea Campanella241896c2017-05-10 13:11:04 -0700316 providerRegistry.unregister(this);
317 providerService = null;
318 cfgService.unregisterConfigFactory(factory);
319 log.info("Stopped");
320 }
321
Andrea Campanella241896c2017-05-10 13:11:04 -0700322
323 @Override
324 public void triggerProbe(DeviceId deviceId) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200325 connectionExecutor.execute(withDeviceLock(
326 () -> doDeviceProbe(deviceId), deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700327 }
328
329 @Override
330 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700331 log.info("Notifying role {} to device {}", newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200332 requestedRoles.put(deviceId, newRole);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200333 connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200334 }
335
336 private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700337 final DeviceHandshaker handshaker = getBehaviour(
338 deviceId, DeviceHandshaker.class);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200339 if (handshaker == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200340 log.error("Null handshaker. Unable to notify new role {} to {}",
341 newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200342 return;
343 }
344 handshaker.roleChanged(newRole);
Andrea Campanella241896c2017-05-10 13:11:04 -0700345 }
346
347 @Override
348 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200349 log.debug("Testing reachability for device {}", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700350 final DeviceHandshaker handshaker = getBehaviour(
351 deviceId, DeviceHandshaker.class);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100352 if (handshaker == null) {
353 return false;
354 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200355 return getFutureWithDeadline(
356 handshaker.isReachable(), "checking reachability",
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200357 deviceId, false, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700358 }
359
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700360 private boolean isConnected(DeviceId deviceId) {
361 log.debug("Testing connection to device {}", deviceId);
362 final DeviceHandshaker handshaker = getBehaviour(
363 deviceId, DeviceHandshaker.class);
364 if (handshaker == null) {
365 return false;
366 }
367 return handshaker.isConnected();
368 }
369
Andrea Campanella241896c2017-05-10 13:11:04 -0700370 @Override
371 public void changePortState(DeviceId deviceId, PortNumber portNumber,
372 boolean enable) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200373 connectionExecutor.execute(
374 () -> doChangePortState(deviceId, portNumber, enable));
375 }
376
377 private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
378 boolean enable) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200379 if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
380 log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
381 deviceId);
382 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700383 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200384 final PortAdmin portAdmin = deviceService.getDevice(deviceId)
385 .as(PortAdmin.class);
386 final CompletableFuture<Boolean> modifyTask = enable
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200387 ? portAdmin.enable(portNumber)
388 : portAdmin.disable(portNumber);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200389 final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
390 getFutureWithDeadline(
391 modifyTask, descr, deviceId, null, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700392 }
393
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700394 @Override
395 public void triggerDisconnect(DeviceId deviceId) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200396 log.debug("Triggering disconnection of device {}", deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200397 connectionExecutor.execute(withDeviceLock(
398 () -> doDisconnectDevice(deviceId), deviceId));
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700399 }
400
Andrea Campanella241896c2017-05-10 13:11:04 -0700401 private Driver getDriver(DeviceId deviceId) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700402 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200403 // DriverManager checks first using basic device config.
404 return driverService.getDriver(deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700405 } catch (ItemNotFoundException e) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200406 log.error("Driver not found for {}", deviceId);
407 return null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700408 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700409 }
410
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700411 private <T extends Behaviour> T getBehaviour(DeviceId deviceId, Class<T> type) {
412 // Get handshaker.
413
414 Driver driver = getDriver(deviceId);
415 if (driver == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700416 return null;
417 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700418 if (!driver.hasBehaviour(type)) {
419 return null;
420 }
421 final DriverData data = new DefaultDriverData(driver, deviceId);
422 // Storing deviceKeyId and all other config values as data in the driver
423 // with protocol_<info> name as the key. e.g protocol_ip.
424 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
425 deviceId, GeneralProviderDeviceConfig.class);
426 if (providerConfig != null) {
427 providerConfig.protocolsInfo().forEach((protocol, info) -> {
428 info.configValues().forEach(
429 (k, v) -> data.set(protocol + "_" + k, v));
430 data.set(protocol + "_key", info.deviceKeyId());
431 });
432 }
433 final DefaultDriverHandler handler = new DefaultDriverHandler(data);
434 return driver.createBehaviour(handler, type);
Andrea Campanella241896c2017-05-10 13:11:04 -0700435 }
436
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200437 private void doConnectDevice(DeviceId deviceId) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700438 log.debug("Initiating connection to device {}...", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200439 // Retrieve config
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700440 if (configIsMissing(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200441 return;
442 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700443 // Bind pipeconf (if any and if device is capable).
444 if (!bindPipeconfIfRequired(deviceId)) {
445 // We already logged the error.
446 return;
447 }
448 // Get handshaker.
449 final DeviceHandshaker handshaker = getBehaviour(
450 deviceId, DeviceHandshaker.class);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200451 if (handshaker == null) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700452 log.error("Missing handshaker behavior for {}, aborting connection",
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200453 deviceId);
454 return;
455 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700456 // Add device agent listener.
457 handshaker.addDeviceAgentListener(id(), deviceAgentListener);
458 handshakersWithListeners.put(deviceId, handshaker);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200459 // Start connection via handshaker.
460 final Boolean connectSuccess = getFutureWithDeadline(
461 handshaker.connect(), "initiating connection",
Carmelo Casconede3b6842018-09-05 17:45:10 -0700462 deviceId, false, opTimeoutShort);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700463 if (!connectSuccess) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200464 log.warn("Unable to connect to {}", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700465 }
466 }
467
468 private void triggerAdvertiseDevice(DeviceId deviceId) {
469 connectionExecutor.execute(withDeviceLock(
470 () -> doAdvertiseDevice(deviceId), deviceId));
471 }
472
473 private void doAdvertiseDevice(DeviceId deviceId) {
474 // Retrieve config
475 if (configIsMissing(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200476 return;
477 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700478 // Obtain device and port description.
479 final boolean isPipelineReady = isPipelineReady(deviceId);
480 final DeviceDescription description = getDeviceDescription(
481 deviceId, isPipelineReady);
482 final List<PortDescription> ports = getPortDetails(deviceId);
483 // Advertise to core.
484 if (deviceService.getDevice(deviceId) == null ||
485 (description.isDefaultAvailable() &&
486 !deviceService.isAvailable(deviceId))) {
487 if (!isPipelineReady) {
488 log.info("Advertising device to core with available={} as " +
489 "device pipeline is not ready yet",
490 description.isDefaultAvailable());
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400491 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700492 providerService.deviceConnected(deviceId, description);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200493 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700494 providerService.updatePorts(deviceId, ports);
495 // If pipeline is not ready, encourage watchdog to perform probe ASAP.
496 if (!isPipelineReady) {
497 pipeconfWatchdogService.triggerProbe(deviceId);
498 }
499 }
500
501 private DeviceDescription getDeviceDescription(
502 DeviceId deviceId, boolean defaultAvailable) {
503 // Get one from driver or forge.
504 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
505 deviceId, DeviceDescriptionDiscovery.class);
506 if (deviceDiscovery != null) {
507 // Enforce defaultAvailable flag over the one obtained from driver.
508 final DeviceDescription d = deviceDiscovery.discoverDeviceDetails();
509 return new DefaultDeviceDescription(d, defaultAvailable, d.annotations());
510 } else {
511 return forgeDeviceDescription(deviceId, defaultAvailable);
512 }
513 }
514
515 private List<PortDescription> getPortDetails(DeviceId deviceId) {
516 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
517 deviceId, DeviceDescriptionDiscovery.class);
518 if (deviceDiscovery != null) {
519 return deviceDiscovery.discoverPortDetails();
520 } else {
521 return Collections.emptyList();
522 }
523 }
524
525 private DeviceDescription forgeDeviceDescription(
526 DeviceId deviceId, boolean defaultAvailable) {
527 // Uses handshaker and provider config to get driver data.
528 final DeviceHandshaker handshaker = getBehaviour(
529 deviceId, DeviceHandshaker.class);
530 final Driver driver = handshaker != null
531 ? handshaker.handler().driver() : null;
532 final GeneralProviderDeviceConfig cfg = cfgService.getConfig(
533 deviceId, GeneralProviderDeviceConfig.class);
534 final DefaultAnnotations.Builder annBuilder = DefaultAnnotations.builder();
535 // If device is pipeline programmable, let this provider decide when the
536 // device can be marked online.
537 annBuilder.set(AnnotationKeys.PROVIDER_MARK_ONLINE,
538 String.valueOf(isPipelineProgrammable(deviceId)));
539 if (cfg != null) {
540 StringJoiner protoStringBuilder = new StringJoiner(", ");
541 cfg.protocolsInfo().keySet().forEach(protoStringBuilder::add);
542 annBuilder.set(AnnotationKeys.PROTOCOL, protoStringBuilder.toString());
543 }
544 return new DefaultDeviceDescription(
545 deviceId.uri(),
546 Device.Type.SWITCH,
547 driver != null ? driver.manufacturer() : UNKNOWN,
548 driver != null ? driver.hwVersion() : UNKNOWN,
549 driver != null ? driver.swVersion() : UNKNOWN,
550 UNKNOWN,
551 new ChassisId(),
552 defaultAvailable,
553 annBuilder.build());
554 }
555
556 private void triggerMarkAvailable(DeviceId deviceId) {
557 connectionExecutor.execute(withDeviceLock(
558 () -> doMarkAvailable(deviceId), deviceId));
559 }
560
561 private void doMarkAvailable(DeviceId deviceId) {
562 if (deviceService.isAvailable(deviceId)) {
563 return;
564 }
565 final DeviceDescription descr = getDeviceDescription(deviceId, true);
566 // It has been observed that devices that were marked offline (e.g.
567 // after device disconnection) might end up with no master. Here we
568 // trigger a new master election (if device has no master).
569 mastershipService.requestRoleForSync(deviceId);
570 providerService.deviceConnected(deviceId, descr);
571 }
572
573 private boolean bindPipeconfIfRequired(DeviceId deviceId) {
574 if (pipeconfService.ofDevice(deviceId).isPresent()
575 || !isPipelineProgrammable(deviceId)) {
576 // Nothing to do, all good.
577 return true;
578 }
579 // Get pipeconf from netcfg or driver (default one).
580 final PiPipelineProgrammable pipelineProg = getBehaviour(
581 deviceId, PiPipelineProgrammable.class);
582 final PiPipeconfId pipeconfId = getPipeconfId(deviceId, pipelineProg);
583 if (pipeconfId == null) {
584 return false;
585 }
586 // Store binding in pipeconf service.
587 pipeconfService.bindToDevice(pipeconfId, deviceId);
588 return true;
589 }
590
591 private PiPipeconfId getPipeconfId(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
592 // Places to look for a pipeconf ID (in priority order)):
593 // 1) netcfg
594 // 2) device driver (default one)
595 final PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
596 if (pipeconfId != null && !pipeconfId.id().isEmpty()) {
597 return pipeconfId;
598 }
599 if (pipelineProg != null
600 && pipelineProg.getDefaultPipeconf().isPresent()) {
601 final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
602 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
603 return defaultPipeconf.id();
604 } else {
605 log.warn("Unable to associate a pipeconf to {}", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200606 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400607 }
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400608 }
609
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200610 private void doDisconnectDevice(DeviceId deviceId) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200611 log.debug("Initiating disconnection from {}...", deviceId);
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700612 final DeviceHandshaker handshaker = getBehaviour(
613 deviceId, DeviceHandshaker.class);
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200614 final boolean isAvailable = deviceService.isAvailable(deviceId);
615 // Signal disconnection to core (if master).
616 if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200617 providerService.deviceDisconnected(deviceId);
618 }
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200619 // Cancel tasks.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200620 cancelStatsPolling(deviceId);
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200621 // Disconnect device.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200622 if (handshaker == null) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200623 if (isAvailable) {
624 // If not available don't bother logging. We are probably
625 // invoking this method multiple times for the same device.
626 log.warn("Missing DeviceHandshaker behavior for {}, " +
627 "no guarantees of complete disconnection",
628 deviceId);
629 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200630 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700631 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700632 handshaker.removeDeviceAgentListener(id());
633 handshakersWithListeners.remove(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200634 final boolean disconnectSuccess = getFutureWithDeadline(
635 handshaker.disconnect(), "performing disconnection",
636 deviceId, false, opTimeoutShort);
637 if (!disconnectSuccess) {
638 log.warn("Unable to disconnect from {}", deviceId);
639 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700640 }
641
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200642 // Needed to catch the exception in the executors since are not rethrown otherwise.
Andrea Campanella241896c2017-05-10 13:11:04 -0700643 private Runnable exceptionSafe(Runnable runnable) {
644 return () -> {
645 try {
646 runnable.run();
647 } catch (Exception e) {
648 log.error("Unhandled Exception", e);
649 }
650 };
651 }
652
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200653 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
654 final Lock lock = deviceLocks.get(deviceId);
655 lock.lock();
656 try {
657 return task.get();
658 } finally {
659 lock.unlock();
660 }
661 }
662
663 private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
664 // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
665 return () -> withDeviceLock(() -> {
666 task.run();
667 return null;
668 }, deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200669 }
670
Andrea Campanella241896c2017-05-10 13:11:04 -0700671 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900672 Device device = deviceService.getDevice(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200673 if (device != null && deviceService.isAvailable(deviceId) &&
Andrea Campanellace111932017-09-18 16:59:56 +0900674 device.is(PortStatisticsDiscovery.class)) {
675 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
676 .discoverPortStatistics();
677 //updating statistcs only if not empty
678 if (!statistics.isEmpty()) {
679 providerService.updatePortStatistics(deviceId, statistics);
680 }
681 } else {
682 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200683 }
684 }
685
Carmelo Casconee5b28722018-06-22 17:28:28 +0200686 private boolean notMyScheme(DeviceId deviceId) {
687 return !deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700688 }
689
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200690 private void triggerConnect(DeviceId deviceId) {
691 connectionExecutor.execute(withDeviceLock(
692 () -> doConnectDevice(deviceId), deviceId));
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200693 }
694
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700695 private boolean isPipelineProgrammable(DeviceId deviceId) {
696 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
697 deviceId, GeneralProviderDeviceConfig.class);
698 if (providerConfig == null) {
699 return false;
700 }
701 return !Collections.disjoint(
702 ImmutableSet.copyOf(providerConfig.node().fieldNames()),
703 PIPELINE_CONFIGURABLE_PROTOCOLS);
704 }
705
Andrea Campanella241896c2017-05-10 13:11:04 -0700706 /**
707 * Listener for configuration events.
708 */
709 private class InternalNetworkConfigListener implements NetworkConfigListener {
710
Andrea Campanella241896c2017-05-10 13:11:04 -0700711 @Override
712 public void event(NetworkConfigEvent event) {
Carmelo Cascone7044efd2018-07-06 13:01:36 +0200713 connectionExecutor.execute(() -> consumeConfigEvent(event));
714 }
715
716 @Override
717 public boolean isRelevant(NetworkConfigEvent event) {
718 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
719 event.configClass().equals(BasicDeviceConfig.class) ||
720 event.configClass().equals(PiPipeconfConfig.class)) &&
721 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
722 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
723 }
724
725 private void consumeConfigEvent(NetworkConfigEvent event) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700726 DeviceId deviceId = (DeviceId) event.subject();
727 //Assuming that the deviceId comes with uri 'device:'
Carmelo Casconee5b28722018-06-22 17:28:28 +0200728 if (notMyScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700729 // not under my scheme, skipping
730 log.debug("{} is not my scheme, skipping", deviceId);
731 return;
732 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200733 final boolean configComplete = withDeviceLock(
734 () -> isDeviceConfigComplete(event, deviceId), deviceId);
735 if (!configComplete) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200736 // Still waiting for some configuration.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200737 return;
738 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200739 // Good to go.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200740 triggerConnect(deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200741 cleanUpConfigInfo(deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200742 }
743
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200744 private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
745 // FIXME to be removed when netcfg will issue device events in a bundle or
Andrea Campanellabc112a92017-06-26 19:06:43 +0200746 // ensure all configuration needed is present
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200747 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
748 //FIXME we currently assume that p4runtime devices are pipeline configurable.
749 //If we want to connect a p4runtime device with no pipeline
750 if (event.config().isPresent()) {
751 deviceConfigured.add(deviceId);
752 final boolean isNotPipelineConfigurable = Collections.disjoint(
753 ImmutableSet.copyOf(event.config().get().node().fieldNames()),
754 PIPELINE_CONFIGURABLE_PROTOCOLS);
755 if (isNotPipelineConfigurable) {
756 // Skip waiting for a pipeline if we can't support it.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200757 pipelineConfigured.add(deviceId);
758 }
759 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200760 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
761 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
762 driverConfigured.add(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200763 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200764 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
765 if (event.config().isPresent()
766 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
767 pipelineConfigured.add(deviceId);
768 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700769 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200770
771 if (deviceConfigured.contains(deviceId)
772 && driverConfigured.contains(deviceId)
773 && pipelineConfigured.contains(deviceId)) {
774 return true;
775 } else {
776 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
777 log.debug("Waiting for pipeline configuration for device {}", deviceId);
778 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
779 log.debug("Waiting for device configuration for device {}", deviceId);
780 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
781 log.debug("Waiting for driver configuration for device {}", deviceId);
782 } else if (driverConfigured.contains(deviceId)) {
783 log.debug("Only driver configuration for device {}", deviceId);
784 } else if (deviceConfigured.contains(deviceId)) {
785 log.debug("Only device configuration for device {}", deviceId);
786 }
787 }
788 return false;
Andrea Campanella241896c2017-05-10 13:11:04 -0700789 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700790 }
791
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700792 private boolean isPipelineReady(DeviceId deviceId) {
793 final boolean isPipelineProg = isPipelineProgrammable(deviceId);
794 final boolean isPipeconfReady = pipeconfWatchdogService
795 .getStatus(deviceId)
796 .equals(PiPipeconfWatchdogService.PipelineStatus.READY);
797 return !isPipelineProg || isPipeconfReady;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200798 }
799
800 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200801 deviceConfigured.remove(deviceId);
802 driverConfigured.remove(deviceId);
803 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200804 }
805
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200806 private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
807 statsPollingTasks.compute(deviceId, (did, oldTask) -> {
808 if (oldTask != null) {
809 oldTask.cancel(false);
810 }
811 final int delay = withRandomDelay
812 ? new SecureRandom().nextInt(10) : 0;
813 return statsExecutor.scheduleAtFixedRate(
814 exceptionSafe(() -> updatePortStatistics(deviceId)),
815 delay, statsPollFrequency, TimeUnit.SECONDS);
816 });
Andrea Campanella19090322017-08-22 10:31:37 +0200817 }
818
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200819 private void cancelStatsPolling(DeviceId deviceId) {
820 statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
821 task.cancel(false);
822 return null;
823 });
824 }
825
826 private void rescheduleStatsPollingTasks() {
827 statsPollingTasks.keySet().forEach(deviceId -> {
828 // startStatsPolling cancels old one if present.
829 startStatsPolling(deviceId, true);
830 });
831 }
832
833 private void triggerProbeAllDevices() {
834 // Async trigger a task for all devices in the cfg.
835 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700836 .forEach(this::triggerDeviceProbe);
Andrea Campanella1e573442018-05-17 17:07:13 +0200837 }
838
Carmelo Cascone92044522018-06-29 19:00:59 +0200839 private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
840 PiPipeconfConfig config = cfgService.getConfig(
841 deviceId, PiPipeconfConfig.class);
842 if (config == null) {
843 return null;
844 }
845 return config.piPipeconfId();
846 }
847
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700848 private void triggerDeviceProbe(DeviceId deviceId) {
849 connectionExecutor.execute(withDeviceLock(
850 () -> doDeviceProbe(deviceId), deviceId));
851 }
852
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200853 private void doDeviceProbe(DeviceId deviceId) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700854 log.debug("Probing device {}...", deviceId);
855 if (configIsMissing(deviceId)) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200856 return;
857 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700858 if (!isConnected(deviceId)) {
859 if (deviceService.isAvailable(deviceId)) {
860 providerService.deviceDisconnected(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200861 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200862 triggerConnect(deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200863 }
864 }
865
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700866 private boolean configIsMissing(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200867 final boolean present =
868 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
869 && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
Andrea Campanella1e573442018-05-17 17:07:13 +0200870 if (!present) {
871 log.warn("Configuration for device {} is not complete", deviceId);
872 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700873 return !present;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200874 }
875
876 private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700877 // Notify core about mastership response.
878 final MastershipRole request = requestedRoles.get(deviceId);
879 final boolean isAvailable = deviceService.isAvailable(deviceId);
880 if (request == null || !isAvailable) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200881 return;
882 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700883 log.debug("Device {} asserted role {} (requested was {})",
884 deviceId, response, request);
885 providerService.receivedRoleReply(deviceId, request, response);
886 // FIXME: this should be based on assigned mastership, not what returned by device
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200887 if (response.equals(MastershipRole.MASTER)) {
888 startStatsPolling(deviceId, false);
889 } else {
890 cancelStatsPolling(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200891 }
892 }
893
Carmelo Casconede3b6842018-09-05 17:45:10 -0700894 private void handleNotMaster(DeviceId deviceId) {
895 log.warn("Device {} notified that this node is not master, " +
896 "relinquishing mastership...", deviceId);
897 mastershipService.relinquishMastership(deviceId);
898 }
899
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200900 private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200901 DeviceId deviceId, U defaultValue, int timeout) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200902 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200903 return future.get(timeout, TimeUnit.SECONDS);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200904 } catch (InterruptedException e) {
905 log.error("Thread interrupted while {} on {}", opDescription, deviceId);
906 Thread.currentThread().interrupt();
907 } catch (ExecutionException e) {
908 log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
909 } catch (TimeoutException e) {
910 log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
911 }
912 return defaultValue;
913 }
914
Andrea Campanella241896c2017-05-10 13:11:04 -0700915 /**
916 * Listener for core device events.
917 */
918 private class InternalDeviceListener implements DeviceListener {
919 @Override
920 public void event(DeviceEvent event) {
Andrea Campanellace111932017-09-18 16:59:56 +0900921 DeviceId deviceId = event.subject().id();
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700922 // For now this is scheduled periodically, when streaming API will
923 // be available we check and base it on the streaming API (e.g. gNMI)
924 if (mastershipService.isLocalMaster(deviceId)) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200925 startStatsPolling(deviceId, true);
Andrea Campanella241896c2017-05-10 13:11:04 -0700926 }
927 }
928
929 @Override
930 public boolean isRelevant(DeviceEvent event) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700931 return event.type() == Type.DEVICE_ADDED &&
932 event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700933 }
934 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200935
936 /**
Carmelo Casconee5b28722018-06-22 17:28:28 +0200937 * Listener for device agent events.
Andrea Campanella1e573442018-05-17 17:07:13 +0200938 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200939 private class InternalDeviceAgentListener implements DeviceAgentListener {
Andrea Campanella1e573442018-05-17 17:07:13 +0200940
941 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200942 public void event(DeviceAgentEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200943 DeviceId deviceId = event.subject();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200944 switch (event.type()) {
945 case CHANNEL_OPEN:
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700946 triggerAdvertiseDevice(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200947 break;
948 case CHANNEL_CLOSED:
Carmelo Casconee5b28722018-06-22 17:28:28 +0200949 case CHANNEL_ERROR:
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700950 triggerDeviceProbe(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200951 break;
952 case ROLE_MASTER:
953 handleMastershipResponse(deviceId, MastershipRole.MASTER);
954 break;
955 case ROLE_STANDBY:
956 handleMastershipResponse(deviceId, MastershipRole.STANDBY);
957 break;
958 case ROLE_NONE:
959 handleMastershipResponse(deviceId, MastershipRole.NONE);
960 break;
Carmelo Casconede3b6842018-09-05 17:45:10 -0700961 case NOT_MASTER:
962 handleNotMaster(deviceId);
963 break;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200964 default:
965 log.warn("Unrecognized device agent event {}", event.type());
Andrea Campanella1e573442018-05-17 17:07:13 +0200966 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200967 }
968
969 }
Carmelo Cascone9e4972c2018-08-30 00:29:16 -0700970
971 private class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
972 @Override
973 public void event(PiPipeconfWatchdogEvent event) {
974 triggerMarkAvailable(event.subject());
975 }
976
977 @Override
978 public boolean isRelevant(PiPipeconfWatchdogEvent event) {
979 return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_READY);
980 }
981 }
982
983 private class InternalConfigFactory
984 extends ConfigFactory<DeviceId, GeneralProviderDeviceConfig> {
985
986 InternalConfigFactory() {
987 super(SubjectFactories.DEVICE_SUBJECT_FACTORY,
988 GeneralProviderDeviceConfig.class, CFG_SCHEME);
989 }
990
991 @Override
992 public GeneralProviderDeviceConfig createConfig() {
993 return new GeneralProviderDeviceConfig();
994 }
995 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700996}