blob: 6fd0ff6d15e343e4bde58c3715ef047d10438665 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella241896c2017-05-10 13:11:04 -070032import org.onosproject.core.CoreService;
33import org.onosproject.net.AnnotationKeys;
34import org.onosproject.net.DefaultAnnotations;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.MastershipRole;
38import org.onosproject.net.PortNumber;
39import org.onosproject.net.SparseAnnotations;
40import org.onosproject.net.behaviour.PortAdmin;
41import org.onosproject.net.config.ConfigFactory;
42import org.onosproject.net.config.NetworkConfigEvent;
43import org.onosproject.net.config.NetworkConfigListener;
44import org.onosproject.net.config.NetworkConfigRegistry;
45import org.onosproject.net.config.basics.BasicDeviceConfig;
46import org.onosproject.net.config.basics.SubjectFactories;
47import org.onosproject.net.device.DefaultDeviceDescription;
48import org.onosproject.net.device.DeviceDescription;
49import org.onosproject.net.device.DeviceDescriptionDiscovery;
50import org.onosproject.net.device.DeviceEvent;
51import org.onosproject.net.device.DeviceHandshaker;
52import org.onosproject.net.device.DeviceListener;
53import org.onosproject.net.device.DeviceProvider;
54import org.onosproject.net.device.DeviceProviderRegistry;
55import org.onosproject.net.device.DeviceProviderService;
56import org.onosproject.net.device.DeviceService;
57import org.onosproject.net.device.PortDescription;
58import org.onosproject.net.device.PortStatistics;
59import org.onosproject.net.device.PortStatisticsDiscovery;
60import org.onosproject.net.driver.Behaviour;
61import org.onosproject.net.driver.DefaultDriverData;
62import org.onosproject.net.driver.DefaultDriverHandler;
63import org.onosproject.net.driver.Driver;
64import org.onosproject.net.driver.DriverData;
65import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040066import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020067import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040068import org.onosproject.net.pi.model.PiPipelineProgrammable;
Andrea Campanellabc112a92017-06-26 19:06:43 +020069import org.onosproject.net.pi.runtime.PiPipeconfConfig;
70import org.onosproject.net.pi.runtime.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070071import org.onosproject.net.provider.AbstractProvider;
72import org.onosproject.net.provider.ProviderId;
73import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020074import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070075import org.slf4j.Logger;
76
77import java.util.ArrayList;
78import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020079import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020080import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070081import java.util.List;
Andrea Campanellace111932017-09-18 16:59:56 +090082import java.util.Objects;
Andrea Campanella19090322017-08-22 10:31:37 +020083import java.util.Random;
Andrea Campanellabc112a92017-06-26 19:06:43 +020084import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070085import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020086import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020087import java.util.concurrent.ConcurrentMap;
88import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070089import java.util.concurrent.ExecutionException;
90import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020091import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070092import java.util.concurrent.TimeUnit;
93import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +020094import java.util.concurrent.locks.Lock;
95import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -070096
97import static java.util.concurrent.Executors.newScheduledThreadPool;
98import static org.onlab.util.Tools.groupedThreads;
99import static org.onosproject.net.device.DeviceEvent.Type;
100import static org.slf4j.LoggerFactory.getLogger;
101
102/**
103 * Provider which uses drivers to detect device and do initial handshake
104 * and channel establishment with devices. Any other provider specific operation
105 * is also delegated to the DeviceHandshaker driver.
106 */
107@Beta
108@Component(immediate = true)
109public class GeneralDeviceProvider extends AbstractProvider
110 implements DeviceProvider {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200111 public static final String DRIVER = "driver";
Andrea Campanella241896c2017-05-10 13:11:04 -0700112 private final Logger log = getLogger(getClass());
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected DeviceProviderRegistry providerRegistry;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected NetworkConfigRegistry cfgService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected CoreService coreService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected DeviceService deviceService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected DriverService driverService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200130 protected PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700131
Andrea Campanella19090322017-08-22 10:31:37 +0200132 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
133 @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
134 label = "Configure poll frequency for port status and statistics; " +
135 "default is 10 sec")
136 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
137
Andrea Campanella241896c2017-05-10 13:11:04 -0700138 protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
139 protected static final String URI_SCHEME = "device";
140 protected static final String CFG_SCHEME = "generalprovider";
141 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
142 private static final int CORE_POOL_SIZE = 10;
143 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200144
Andrea Campanellabc112a92017-06-26 19:06:43 +0200145 //FIXME this will be removed when the configuration is synced at the source.
146 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200147
Andrea Campanella19090322017-08-22 10:31:37 +0200148 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200149 //FIXME to be removed when netcfg will issue device events in a bundle or
150 //ensures all configuration needed is present
151 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
152 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
153 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700154
155
156 protected ScheduledExecutorService connectionExecutor
157 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200158 groupedThreads("onos/generaldeviceprovider-device",
159 "connection-executor-%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700160 protected ScheduledExecutorService portStatsExecutor
161 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200162 groupedThreads("onos/generaldeviceprovider-port-stats",
163 "port-stats-executor-%d", log));
164 protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700165
166 protected DeviceProviderService providerService;
167 private InternalDeviceListener deviceListener = new InternalDeviceListener();
168
169 protected final ConfigFactory factory =
170 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
171 SubjectFactories.DEVICE_SUBJECT_FACTORY,
172 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
173 @Override
174 public GeneralProviderDeviceConfig createConfig() {
175 return new GeneralProviderDeviceConfig();
176 }
177 };
178
179 protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
180
181
182 @Activate
183 public void activate() {
184 providerService = providerRegistry.register(this);
185 coreService.registerApplication(APP_NAME);
186 cfgService.registerConfigFactory(factory);
187 cfgService.addListener(cfgListener);
188 deviceService.addListener(deviceListener);
189 //This will fail if ONOS has CFG and drivers which depend on this provider
190 // are activated, failing due to not finding the driver.
191 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
192 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
193 log.info("Started");
194 }
195
Andrea Campanella19090322017-08-22 10:31:37 +0200196 @Modified
197 public void modified(ComponentContext context) {
198 if (context != null) {
199 Dictionary<?, ?> properties = context.getProperties();
200 pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
201 DEFAULT_POLL_FREQUENCY_SECONDS);
202 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
203 }
204
205 if (!scheduledTasks.isEmpty()) {
206 //cancel all previous tasks
207 scheduledTasks.values().forEach(task -> task.cancel(false));
208 //resubmit task with new timeout.
209 Set<DeviceId> deviceSubjects =
210 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
211 deviceSubjects.forEach(deviceId -> {
212 if (!compareScheme(deviceId)) {
213 // not under my scheme, skipping
214 log.debug("{} is not my scheme, skipping", deviceId);
215 return;
216 }
217 scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
218 });
219 }
220
221 }
222
Andrea Campanella241896c2017-05-10 13:11:04 -0700223
224 @Deactivate
225 public void deactivate() {
226 portStatsExecutor.shutdown();
227 cfgService.removeListener(cfgListener);
228 //Not Removing the device so they can still be used from other driver providers
229 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
230 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
231 connectionExecutor.shutdown();
232 deviceService.removeListener(deviceListener);
233 providerRegistry.unregister(this);
234 providerService = null;
235 cfgService.unregisterConfigFactory(factory);
236 log.info("Stopped");
237 }
238
239 public GeneralDeviceProvider() {
240 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
241 }
242
243
244 @Override
245 public void triggerProbe(DeviceId deviceId) {
246 //TODO Really don't see the point of this in non OF Context,
247 // for now testing reachability, can be moved to no-op
248 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
249 isReachable(deviceId);
250 }
251
252 @Override
253 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
254 log.debug("Received role {} for device {}", newRole, deviceId);
255 CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
256 roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
257 }
258
259 @Override
260 public boolean isReachable(DeviceId deviceId) {
261 log.debug("Testing rechability for device {}", deviceId);
262 CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
263 try {
264 return reachable.get(10, TimeUnit.SECONDS);
265 } catch (InterruptedException | ExecutionException | TimeoutException e) {
266 log.error("Device {} is not reachable", deviceId, e);
267 return false;
268 }
269 }
270
271 @Override
272 public void changePortState(DeviceId deviceId, PortNumber portNumber,
273 boolean enable) {
274 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
275
276 PortAdmin portAdmin = getPortAdmin(deviceId);
277 CompletableFuture<Boolean> modified;
278 if (enable) {
279 modified = portAdmin.enable(portNumber);
280 } else {
281 modified = portAdmin.disable(portNumber);
282 }
283 modified.thenAcceptAsync(result -> {
284 if (!result) {
285 log.warn("Your device {} port {} status can't be changed to {}",
Andrea Campanella19090322017-08-22 10:31:37 +0200286 deviceId, portNumber, enable);
Andrea Campanella241896c2017-05-10 13:11:04 -0700287 }
288 });
289
290 } else {
291 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
292 }
293 }
294
295 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
296 Driver driver = getDriver(deviceId);
297 return getBehaviour(driver, DeviceHandshaker.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200298 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700299 }
300
301 private PortAdmin getPortAdmin(DeviceId deviceId) {
302 Driver driver = getDriver(deviceId);
303 return getBehaviour(driver, PortAdmin.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200304 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700305
306 }
307
308 private Driver getDriver(DeviceId deviceId) {
309 Driver driver;
310 try {
311 driver = driverService.getDriver(deviceId);
312 } catch (ItemNotFoundException e) {
313 log.debug("Falling back to configuration to fetch driver " +
Andrea Campanella19090322017-08-22 10:31:37 +0200314 "for device {}", deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700315 driver = driverService.getDriver(
316 cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
317 }
318 return driver;
319 }
320
321 //needed since the device manager will not return the driver through implementation()
322 // method since the device is not pushed to the core so for the connectDevice
323 // we need to work around that in order to test before calling
324 // store.createOrUpdateDevice
325 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
326 DriverData data) {
327 if (driver.hasBehaviour(type)) {
328 DefaultDriverHandler handler = new DefaultDriverHandler(data);
329 return driver.createBehaviour(handler, type);
330 } else {
331 return null;
332 }
333 }
334
335 //Connects a general device
336 private void connectDevice(DeviceId deviceId) {
337 //retrieve the configuration
338 GeneralProviderDeviceConfig providerConfig =
339 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
340 BasicDeviceConfig basicDeviceConfig =
341 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
342
343 if (providerConfig == null || basicDeviceConfig == null) {
344 log.error("Configuration is NULL: basic config {}, general provider " +
Andrea Campanella19090322017-08-22 10:31:37 +0200345 "config {}", basicDeviceConfig, providerConfig);
Andrea Campanella241896c2017-05-10 13:11:04 -0700346 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200347 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700348
Frank Wang554ce972017-09-06 09:56:43 +0800349 Driver driver;
350 try {
351 driver = driverService.getDriver(basicDeviceConfig.driver());
352 } catch (ItemNotFoundException e) {
353 log.warn("The driver of {} is not found : {}", deviceId, e.getMessage());
354 return;
355 }
356
Andrea Campanella241896c2017-05-10 13:11:04 -0700357 DriverData driverData = new DefaultDriverData(driver, deviceId);
Frank Wang554ce972017-09-06 09:56:43 +0800358 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200359 if (handshaker == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700360 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
Andrea Campanella19090322017-08-22 10:31:37 +0200361 "behaviour, {}", deviceId, driver.name(), driver.behaviours());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200362 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700363 }
Frank Wang554ce972017-09-06 09:56:43 +0800364
Andrea Campanellabc112a92017-06-26 19:06:43 +0200365 //Storing deviceKeyId and all other config values
366 // as data in the driver with protocol_<info>
367 // name as the key. e.g protocol_ip
368 providerConfig.protocolsInfo()
369 .forEach((protocol, deviceInfoConfig) -> {
370 deviceInfoConfig.configValues()
371 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
372 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
373 });
374
375 //Connecting to the device
376 CompletableFuture<Boolean> connected = handshaker.connect();
377
378 connected.thenAcceptAsync(result -> {
379 if (result) {
380
381 //Populated with the default values obtained by the driver
382 ChassisId cid = new ChassisId();
383 SparseAnnotations annotations = DefaultAnnotations.builder()
384 .set(AnnotationKeys.PROTOCOL,
Andrea Campanella19090322017-08-22 10:31:37 +0200385 providerConfig.protocolsInfo().keySet().toString())
Andrea Campanellabc112a92017-06-26 19:06:43 +0200386 .build();
387 DeviceDescription description =
388 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
Andrea Campanella19090322017-08-22 10:31:37 +0200389 driver.manufacturer(), driver.hwVersion(),
390 driver.swVersion(), UNKNOWN,
391 cid, false, annotations);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200392 //Empty list of ports
393 List<PortDescription> ports = new ArrayList<>();
394
395 if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
396 DeviceDescriptionDiscovery deviceDiscovery = driver
397 .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
398
399 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
400 if (newdescription != null) {
401 description = newdescription;
402 }
403 ports = deviceDiscovery.discoverPortDetails();
404 }
405
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400406 if (!handlePipeconf(deviceId, driver, driverData)) {
407 // Something went wrong during handling of pipeconf.
408 // We already logged the error.
409 handshaker.disconnect();
410 return;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200411 }
412
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400413 advertiseDevice(deviceId, description, ports);
414
Andrea Campanellabc112a92017-06-26 19:06:43 +0200415 } else {
416 log.warn("Can't connect to device {}", deviceId);
417 }
418 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700419 }
420 }
421
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400422 /**
423 * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
424 * device can be registered to the core, false otherwise.
425 */
426 private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
427
428 PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200429 driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400430
431 if (pipelineProg == null) {
432 // Device is not pipeline programmable.
433 return true;
434 }
435
436 PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
437 // No pipeconf has been associated with this device.
438 // Check if device driver provides a default one.
439 if (pipelineProg.getDefaultPipeconf().isPresent()) {
440 PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
441 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400442 return defaultPipeconf.id();
443 } else {
444 return null;
445 }
446 });
447
448 if (pipeconfId == null) {
449 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200450 return false;
451 }
452
453 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
454 log.warn("Pipeconf {} is not registered", pipeconfId);
455 return false;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400456 }
457
458
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200459 PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400460
461 try {
462 if (!pipelineProg.deployPipeconf(pipeconf).get()) {
463 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
464 return false;
465 }
466
467 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
468 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
Andrea Campanella19090322017-08-22 10:31:37 +0200469 driver.name(), deviceId, pipeconfId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400470 return false;
471 }
472 } catch (InterruptedException | ExecutionException e) {
473 throw new IllegalStateException(e);
474 }
475
476 return true;
477 }
478
Andrea Campanellabc112a92017-06-26 19:06:43 +0200479 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
480 providerService.deviceConnected(deviceId, description);
481 providerService.updatePorts(deviceId, ports);
482 }
483
Andrea Campanella241896c2017-05-10 13:11:04 -0700484 private void disconnectDevice(DeviceId deviceId) {
485 log.info("Disconnecting for device {}", deviceId);
486 DeviceHandshaker handshaker = getHandshaker(deviceId);
487 if (handshaker != null) {
488 CompletableFuture<Boolean> disconnect = handshaker.disconnect();
489
490 disconnect.thenAcceptAsync(result -> {
491 if (result) {
492 log.info("Disconnected device {}", deviceId);
493 providerService.deviceDisconnected(deviceId);
494 } else {
495 log.warn("Device {} was unable to disconnect", deviceId);
496 }
497 });
498 } else {
499 //gracefully ignoring.
500 log.info("No DeviceHandshaker for device {}", deviceId);
501 }
Andrea Campanella19090322017-08-22 10:31:37 +0200502 ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
503 if (pollingStatisticsTask != null) {
504 pollingStatisticsTask.cancel(true);
505 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700506 }
507
508 //Needed to catch the exception in the executors since are not rethrown otherwise.
509 private Runnable exceptionSafe(Runnable runnable) {
510 return () -> {
511 try {
512 runnable.run();
513 } catch (Exception e) {
514 log.error("Unhandled Exception", e);
515 }
516 };
517 }
518
519 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900520 Device device = deviceService.getDevice(deviceId);
521 if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
522 device.is(PortStatisticsDiscovery.class)) {
523 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
524 .discoverPortStatistics();
525 //updating statistcs only if not empty
526 if (!statistics.isEmpty()) {
527 providerService.updatePortStatistics(deviceId, statistics);
528 }
529 } else {
530 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200531 }
532 }
533
534 private boolean compareScheme(DeviceId deviceId) {
535 return deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700536 }
537
538 /**
539 * Listener for configuration events.
540 */
541 private class InternalNetworkConfigListener implements NetworkConfigListener {
542
543
544 @Override
545 public void event(NetworkConfigEvent event) {
546 DeviceId deviceId = (DeviceId) event.subject();
547 //Assuming that the deviceId comes with uri 'device:'
Andrea Campanella19090322017-08-22 10:31:37 +0200548 if (!compareScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700549 // not under my scheme, skipping
550 log.debug("{} is not my scheme, skipping", deviceId);
551 return;
552 }
Andrea Campanellace111932017-09-18 16:59:56 +0900553 if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700554 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200555 return;
556 }
557 //FIXME to be removed when netcfg will issue device events in a bundle or
558 // ensure all configuration needed is present
559 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
560 lock.lock();
561 try {
562 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
563 //FIXME we currently assume that p4runtime devices are pipeline configurable.
564 //If we want to connect a p4runtime device with no pipeline
565 if (event.config().isPresent() &&
566 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
Andrea Campanella19090322017-08-22 10:31:37 +0200567 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200568 pipelineConfigured.add(deviceId);
569 }
570 deviceConfigured.add(deviceId);
571 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
572 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
573 //TODO add check for pipeline and add it to the pipeline list if no
574 // p4runtime is present.
575 driverConfigured.add(deviceId);
576 }
577 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
578 if (event.config().isPresent()
579 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
580 pipelineConfigured.add(deviceId);
581 }
582 }
583 //if the device has no "pipeline configurable protocol it will be present
584 // in the pipelineConfigured
585 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
586 && pipelineConfigured.contains(deviceId)) {
587 checkAndSubmitDeviceTask(deviceId);
588 } else {
589 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
590 log.debug("Waiting for pipeline configuration for device {}", deviceId);
591 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
592 log.debug("Waiting for device configuration for device {}", deviceId);
593 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
594 log.debug("Waiting for driver configuration for device {}", deviceId);
595 } else if (driverConfigured.contains(deviceId)) {
596 log.debug("Only driver configuration for device {}", deviceId);
597 } else if (deviceConfigured.contains(deviceId)) {
598 log.debug("Only device configuration for device {}", deviceId);
599 }
600 }
601 } finally {
602 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700603 }
604 }
605
606 @Override
607 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200608 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
609 event.configClass().equals(BasicDeviceConfig.class) ||
610 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700611 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
612 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
613 }
614 }
615
Andrea Campanellabc112a92017-06-26 19:06:43 +0200616 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
617 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
618 //FIXME this will be removed when configuration is synced.
619 deviceConfigured.remove(deviceId);
620 driverConfigured.remove(deviceId);
621 pipelineConfigured.remove(deviceId);
622
623 }
624
Andrea Campanella19090322017-08-22 10:31:37 +0200625 private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
626 int delay = 0;
627 if (randomize) {
628 delay = new Random().nextInt(10);
629 }
630 return portStatsExecutor.scheduleAtFixedRate(
631 exceptionSafe(() -> updatePortStatistics(deviceId)),
632 delay, pollFrequency, TimeUnit.SECONDS);
633 }
634
Andrea Campanella241896c2017-05-10 13:11:04 -0700635 /**
636 * Listener for core device events.
637 */
638 private class InternalDeviceListener implements DeviceListener {
639 @Override
640 public void event(DeviceEvent event) {
641 Type type = event.type();
Andrea Campanellace111932017-09-18 16:59:56 +0900642 DeviceId deviceId = event.subject().id();
Andrea Campanella241896c2017-05-10 13:11:04 -0700643 if (type.equals((Type.DEVICE_ADDED))) {
644
645 //For now this is scheduled periodically, when streaming API will
646 // be available we check and base it on the streaming API (e.g. gNMI)
Andrea Campanellace111932017-09-18 16:59:56 +0900647 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
648 updatePortStatistics(deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700649
650 } else if (type.equals(Type.DEVICE_REMOVED)) {
651 connectionExecutor.submit(exceptionSafe(() ->
Andrea Campanellace111932017-09-18 16:59:56 +0900652 disconnectDevice(deviceId)));
Andrea Campanella241896c2017-05-10 13:11:04 -0700653 }
654 }
655
656 @Override
657 public boolean isRelevant(DeviceEvent event) {
Andrea Campanella19090322017-08-22 10:31:37 +0200658 return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700659 }
660 }
661}