blob: 1b3c459660f34ea25607e4836e4f2952a53c9c2d [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.onlab.packet.ChassisId;
28import org.onlab.util.ItemNotFoundException;
29import org.onosproject.core.CoreService;
30import org.onosproject.net.AnnotationKeys;
31import org.onosproject.net.DefaultAnnotations;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.MastershipRole;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.SparseAnnotations;
37import org.onosproject.net.behaviour.PortAdmin;
38import org.onosproject.net.config.ConfigFactory;
39import org.onosproject.net.config.NetworkConfigEvent;
40import org.onosproject.net.config.NetworkConfigListener;
41import org.onosproject.net.config.NetworkConfigRegistry;
42import org.onosproject.net.config.basics.BasicDeviceConfig;
43import org.onosproject.net.config.basics.SubjectFactories;
44import org.onosproject.net.device.DefaultDeviceDescription;
45import org.onosproject.net.device.DeviceDescription;
46import org.onosproject.net.device.DeviceDescriptionDiscovery;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceHandshaker;
49import org.onosproject.net.device.DeviceListener;
50import org.onosproject.net.device.DeviceProvider;
51import org.onosproject.net.device.DeviceProviderRegistry;
52import org.onosproject.net.device.DeviceProviderService;
53import org.onosproject.net.device.DeviceService;
54import org.onosproject.net.device.PortDescription;
55import org.onosproject.net.device.PortStatistics;
56import org.onosproject.net.device.PortStatisticsDiscovery;
57import org.onosproject.net.driver.Behaviour;
58import org.onosproject.net.driver.DefaultDriverData;
59import org.onosproject.net.driver.DefaultDriverHandler;
60import org.onosproject.net.driver.Driver;
61import org.onosproject.net.driver.DriverData;
62import org.onosproject.net.driver.DriverService;
Andrea Campanellabc112a92017-06-26 19:06:43 +020063import org.onosproject.net.pi.model.PiPipeconfId;
64import org.onosproject.net.pi.runtime.PiPipeconfConfig;
65import org.onosproject.net.pi.runtime.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070066import org.onosproject.net.provider.AbstractProvider;
67import org.onosproject.net.provider.ProviderId;
68import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
69import org.slf4j.Logger;
70
71import java.util.ArrayList;
72import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020073import java.util.Collections;
Andrea Campanella241896c2017-05-10 13:11:04 -070074import java.util.List;
Andrea Campanellabc112a92017-06-26 19:06:43 +020075import java.util.Optional;
76import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070077import java.util.concurrent.CompletableFuture;
Andrea Campanellabc112a92017-06-26 19:06:43 +020078import java.util.concurrent.ConcurrentMap;
79import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070080import java.util.concurrent.ExecutionException;
81import java.util.concurrent.ScheduledExecutorService;
82import java.util.concurrent.TimeUnit;
83import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +020084import java.util.concurrent.locks.Lock;
85import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -070086
87import static java.util.concurrent.Executors.newScheduledThreadPool;
88import static org.onlab.util.Tools.groupedThreads;
89import static org.onosproject.net.device.DeviceEvent.Type;
90import static org.slf4j.LoggerFactory.getLogger;
91
92/**
93 * Provider which uses drivers to detect device and do initial handshake
94 * and channel establishment with devices. Any other provider specific operation
95 * is also delegated to the DeviceHandshaker driver.
96 */
97@Beta
98@Component(immediate = true)
99public class GeneralDeviceProvider extends AbstractProvider
100 implements DeviceProvider {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200101 public static final String DRIVER = "driver";
Andrea Campanella241896c2017-05-10 13:11:04 -0700102 private final Logger log = getLogger(getClass());
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected DeviceProviderRegistry providerRegistry;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected NetworkConfigRegistry cfgService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected CoreService coreService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected DeviceService deviceService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected DriverService driverService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200120 protected PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700121
122 protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
123 protected static final String URI_SCHEME = "device";
124 protected static final String CFG_SCHEME = "generalprovider";
125 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
126 private static final int CORE_POOL_SIZE = 10;
127 private static final String UNKNOWN = "unknown";
128 private static final int PORT_STATS_PERIOD_SECONDS = 10;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200129 //FIXME this will be removed when the configuration is synced at the source.
130 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
131 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
132
133 //FIXME to be removed when netcfg will issue device events in a bundle or
134 //ensures all configuration needed is present
135 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
136 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
137 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700138
139
140 protected ScheduledExecutorService connectionExecutor
141 = newScheduledThreadPool(CORE_POOL_SIZE,
142 groupedThreads("onos/generaldeviceprovider-device",
143 "connection-executor-%d", log));
144 protected ScheduledExecutorService portStatsExecutor
145 = newScheduledThreadPool(CORE_POOL_SIZE,
146 groupedThreads("onos/generaldeviceprovider-port-stats",
147 "port-stats-executor-%d", log));
148
149 protected DeviceProviderService providerService;
150 private InternalDeviceListener deviceListener = new InternalDeviceListener();
151
152 protected final ConfigFactory factory =
153 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
154 SubjectFactories.DEVICE_SUBJECT_FACTORY,
155 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
156 @Override
157 public GeneralProviderDeviceConfig createConfig() {
158 return new GeneralProviderDeviceConfig();
159 }
160 };
161
162 protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
163
164
165 @Activate
166 public void activate() {
167 providerService = providerRegistry.register(this);
168 coreService.registerApplication(APP_NAME);
169 cfgService.registerConfigFactory(factory);
170 cfgService.addListener(cfgListener);
171 deviceService.addListener(deviceListener);
172 //This will fail if ONOS has CFG and drivers which depend on this provider
173 // are activated, failing due to not finding the driver.
174 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
175 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
176 log.info("Started");
177 }
178
179
180 @Deactivate
181 public void deactivate() {
182 portStatsExecutor.shutdown();
183 cfgService.removeListener(cfgListener);
184 //Not Removing the device so they can still be used from other driver providers
185 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
186 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
187 connectionExecutor.shutdown();
188 deviceService.removeListener(deviceListener);
189 providerRegistry.unregister(this);
190 providerService = null;
191 cfgService.unregisterConfigFactory(factory);
192 log.info("Stopped");
193 }
194
195 public GeneralDeviceProvider() {
196 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
197 }
198
199
200 @Override
201 public void triggerProbe(DeviceId deviceId) {
202 //TODO Really don't see the point of this in non OF Context,
203 // for now testing reachability, can be moved to no-op
204 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
205 isReachable(deviceId);
206 }
207
208 @Override
209 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
210 log.debug("Received role {} for device {}", newRole, deviceId);
211 CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
212 roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
213 }
214
215 @Override
216 public boolean isReachable(DeviceId deviceId) {
217 log.debug("Testing rechability for device {}", deviceId);
218 CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
219 try {
220 return reachable.get(10, TimeUnit.SECONDS);
221 } catch (InterruptedException | ExecutionException | TimeoutException e) {
222 log.error("Device {} is not reachable", deviceId, e);
223 return false;
224 }
225 }
226
227 @Override
228 public void changePortState(DeviceId deviceId, PortNumber portNumber,
229 boolean enable) {
230 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
231
232 PortAdmin portAdmin = getPortAdmin(deviceId);
233 CompletableFuture<Boolean> modified;
234 if (enable) {
235 modified = portAdmin.enable(portNumber);
236 } else {
237 modified = portAdmin.disable(portNumber);
238 }
239 modified.thenAcceptAsync(result -> {
240 if (!result) {
241 log.warn("Your device {} port {} status can't be changed to {}",
242 deviceId, portNumber, enable);
243 }
244 });
245
246 } else {
247 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
248 }
249 }
250
251 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
252 Driver driver = getDriver(deviceId);
253 return getBehaviour(driver, DeviceHandshaker.class,
254 new DefaultDriverData(driver, deviceId));
255 }
256
257 private PortAdmin getPortAdmin(DeviceId deviceId) {
258 Driver driver = getDriver(deviceId);
259 return getBehaviour(driver, PortAdmin.class,
260 new DefaultDriverData(driver, deviceId));
261
262 }
263
264 private Driver getDriver(DeviceId deviceId) {
265 Driver driver;
266 try {
267 driver = driverService.getDriver(deviceId);
268 } catch (ItemNotFoundException e) {
269 log.debug("Falling back to configuration to fetch driver " +
270 "for device {}", deviceId);
271 driver = driverService.getDriver(
272 cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
273 }
274 return driver;
275 }
276
277 //needed since the device manager will not return the driver through implementation()
278 // method since the device is not pushed to the core so for the connectDevice
279 // we need to work around that in order to test before calling
280 // store.createOrUpdateDevice
281 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
282 DriverData data) {
283 if (driver.hasBehaviour(type)) {
284 DefaultDriverHandler handler = new DefaultDriverHandler(data);
285 return driver.createBehaviour(handler, type);
286 } else {
287 return null;
288 }
289 }
290
291 //Connects a general device
292 private void connectDevice(DeviceId deviceId) {
293 //retrieve the configuration
294 GeneralProviderDeviceConfig providerConfig =
295 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
296 BasicDeviceConfig basicDeviceConfig =
297 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
298
299 if (providerConfig == null || basicDeviceConfig == null) {
300 log.error("Configuration is NULL: basic config {}, general provider " +
301 "config {}", basicDeviceConfig, providerConfig);
302 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200303 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700304
305 Driver driver = driverService.getDriver(basicDeviceConfig.driver());
306 DriverData driverData = new DefaultDriverData(driver, deviceId);
307
308 DeviceHandshaker handshaker =
309 getBehaviour(driver, DeviceHandshaker.class, driverData);
310
Andrea Campanellabc112a92017-06-26 19:06:43 +0200311 if (handshaker == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700312 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
313 "behaviour, {}", deviceId, driver.name(), driver.behaviours());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200314 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700315 }
Andrea Campanellabc112a92017-06-26 19:06:43 +0200316 //Storing deviceKeyId and all other config values
317 // as data in the driver with protocol_<info>
318 // name as the key. e.g protocol_ip
319 providerConfig.protocolsInfo()
320 .forEach((protocol, deviceInfoConfig) -> {
321 deviceInfoConfig.configValues()
322 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
323 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
324 });
325
326 //Connecting to the device
327 CompletableFuture<Boolean> connected = handshaker.connect();
328
329 connected.thenAcceptAsync(result -> {
330 if (result) {
331
332 //Populated with the default values obtained by the driver
333 ChassisId cid = new ChassisId();
334 SparseAnnotations annotations = DefaultAnnotations.builder()
335 .set(AnnotationKeys.PROTOCOL,
336 providerConfig.protocolsInfo().keySet().toString())
337 .build();
338 DeviceDescription description =
339 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
340 driver.manufacturer(), driver.hwVersion(),
341 driver.swVersion(), UNKNOWN,
342 cid, false, annotations);
343 //Empty list of ports
344 List<PortDescription> ports = new ArrayList<>();
345
346 if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
347 DeviceDescriptionDiscovery deviceDiscovery = driver
348 .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
349
350 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
351 if (newdescription != null) {
352 description = newdescription;
353 }
354 ports = deviceDiscovery.discoverPortDetails();
355 }
356
357 Optional<PiPipeconfId> pipeconfId = piPipeconfService.ofDevice(deviceId);
358 //Apply the Pipeline configuration and then connect the device
359 if (pipeconfId.isPresent()) {
360 DeviceDescription finalDescription = description;
361 List<PortDescription> finalPorts = ports;
362 piPipeconfService.bindToDevice(pipeconfId.get(), deviceId).whenComplete((success, ex) -> {
363 if (success) {
364 advertiseDevice(deviceId, finalDescription, finalPorts);
365 } else {
366 log.error("Can't merge driver {} with pipeconf {} for device {}, " +
367 "not reporting it to the device manager",
368 driver.name(), pipeconfId.get(), deviceId);
369 }
370 }).exceptionally(ex -> {
371 throw new IllegalStateException(ex);
372 });
373 } else {
374 //No other operation is needed, advertise the device to the core.
375 advertiseDevice(deviceId, description, ports);
376 }
377
378 } else {
379 log.warn("Can't connect to device {}", deviceId);
380 }
381 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700382 }
383 }
384
Andrea Campanellabc112a92017-06-26 19:06:43 +0200385 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
386 providerService.deviceConnected(deviceId, description);
387 providerService.updatePorts(deviceId, ports);
388 }
389
Andrea Campanella241896c2017-05-10 13:11:04 -0700390 private void disconnectDevice(DeviceId deviceId) {
391 log.info("Disconnecting for device {}", deviceId);
392 DeviceHandshaker handshaker = getHandshaker(deviceId);
393 if (handshaker != null) {
394 CompletableFuture<Boolean> disconnect = handshaker.disconnect();
395
396 disconnect.thenAcceptAsync(result -> {
397 if (result) {
398 log.info("Disconnected device {}", deviceId);
399 providerService.deviceDisconnected(deviceId);
400 } else {
401 log.warn("Device {} was unable to disconnect", deviceId);
402 }
403 });
404 } else {
405 //gracefully ignoring.
406 log.info("No DeviceHandshaker for device {}", deviceId);
407 }
408 }
409
410 //Needed to catch the exception in the executors since are not rethrown otherwise.
411 private Runnable exceptionSafe(Runnable runnable) {
412 return () -> {
413 try {
414 runnable.run();
415 } catch (Exception e) {
416 log.error("Unhandled Exception", e);
417 }
418 };
419 }
420
421 private void updatePortStatistics(DeviceId deviceId) {
422 Collection<PortStatistics> statistics = deviceService.getDevice(deviceId)
423 .as(PortStatisticsDiscovery.class)
424 .discoverPortStatistics();
425 providerService.updatePortStatistics(deviceId, statistics);
426 }
427
428 /**
429 * Listener for configuration events.
430 */
431 private class InternalNetworkConfigListener implements NetworkConfigListener {
432
433
434 @Override
435 public void event(NetworkConfigEvent event) {
436 DeviceId deviceId = (DeviceId) event.subject();
437 //Assuming that the deviceId comes with uri 'device:'
438 if (!deviceId.uri().getScheme().equals(URI_SCHEME)) {
439 // not under my scheme, skipping
440 log.debug("{} is not my scheme, skipping", deviceId);
441 return;
442 }
Andrea Campanellabc112a92017-06-26 19:06:43 +0200443 if (deviceService.getDevice(deviceId) != null || deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700444 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200445 return;
446 }
447 //FIXME to be removed when netcfg will issue device events in a bundle or
448 // ensure all configuration needed is present
449 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
450 lock.lock();
451 try {
452 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
453 //FIXME we currently assume that p4runtime devices are pipeline configurable.
454 //If we want to connect a p4runtime device with no pipeline
455 if (event.config().isPresent() &&
456 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
457 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
458 pipelineConfigured.add(deviceId);
459 }
460 deviceConfigured.add(deviceId);
461 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
462 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
463 //TODO add check for pipeline and add it to the pipeline list if no
464 // p4runtime is present.
465 driverConfigured.add(deviceId);
466 }
467 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
468 if (event.config().isPresent()
469 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
470 pipelineConfigured.add(deviceId);
471 }
472 }
473 //if the device has no "pipeline configurable protocol it will be present
474 // in the pipelineConfigured
475 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
476 && pipelineConfigured.contains(deviceId)) {
477 checkAndSubmitDeviceTask(deviceId);
478 } else {
479 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
480 log.debug("Waiting for pipeline configuration for device {}", deviceId);
481 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
482 log.debug("Waiting for device configuration for device {}", deviceId);
483 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
484 log.debug("Waiting for driver configuration for device {}", deviceId);
485 } else if (driverConfigured.contains(deviceId)) {
486 log.debug("Only driver configuration for device {}", deviceId);
487 } else if (deviceConfigured.contains(deviceId)) {
488 log.debug("Only device configuration for device {}", deviceId);
489 }
490 }
491 } finally {
492 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700493 }
494 }
495
496 @Override
497 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200498 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
499 event.configClass().equals(BasicDeviceConfig.class) ||
500 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700501 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
502 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
503 }
504 }
505
Andrea Campanellabc112a92017-06-26 19:06:43 +0200506 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
507 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
508 //FIXME this will be removed when configuration is synced.
509 deviceConfigured.remove(deviceId);
510 driverConfigured.remove(deviceId);
511 pipelineConfigured.remove(deviceId);
512
513 }
514
Andrea Campanella241896c2017-05-10 13:11:04 -0700515 /**
516 * Listener for core device events.
517 */
518 private class InternalDeviceListener implements DeviceListener {
519 @Override
520 public void event(DeviceEvent event) {
521 Type type = event.type();
522
523 if (type.equals((Type.DEVICE_ADDED))) {
524
525 //For now this is scheduled periodically, when streaming API will
526 // be available we check and base it on the streaming API (e.g. gNMI)
527 if (deviceService.getDevice(event.subject().id()).
528 is(PortStatisticsDiscovery.class)) {
529 portStatsExecutor.scheduleAtFixedRate(exceptionSafe(() ->
530 updatePortStatistics(event.subject().id())),
531 0, PORT_STATS_PERIOD_SECONDS, TimeUnit.SECONDS);
532 updatePortStatistics(event.subject().id());
533 }
534
535 } else if (type.equals(Type.DEVICE_REMOVED)) {
536 connectionExecutor.submit(exceptionSafe(() ->
537 disconnectDevice(event.subject().id())));
538 }
539 }
540
541 @Override
542 public boolean isRelevant(DeviceEvent event) {
543 return URI_SCHEME.toUpperCase()
544 .equals(event.subject().id().uri().toString());
545 }
546 }
547}