blob: 77a2f9c8d5d85747f56420762461ec54d2af5c48 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella241896c2017-05-10 13:11:04 -070032import org.onosproject.core.CoreService;
33import org.onosproject.net.AnnotationKeys;
34import org.onosproject.net.DefaultAnnotations;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.MastershipRole;
38import org.onosproject.net.PortNumber;
39import org.onosproject.net.SparseAnnotations;
40import org.onosproject.net.behaviour.PortAdmin;
41import org.onosproject.net.config.ConfigFactory;
42import org.onosproject.net.config.NetworkConfigEvent;
43import org.onosproject.net.config.NetworkConfigListener;
44import org.onosproject.net.config.NetworkConfigRegistry;
45import org.onosproject.net.config.basics.BasicDeviceConfig;
46import org.onosproject.net.config.basics.SubjectFactories;
47import org.onosproject.net.device.DefaultDeviceDescription;
48import org.onosproject.net.device.DeviceDescription;
49import org.onosproject.net.device.DeviceDescriptionDiscovery;
50import org.onosproject.net.device.DeviceEvent;
51import org.onosproject.net.device.DeviceHandshaker;
52import org.onosproject.net.device.DeviceListener;
53import org.onosproject.net.device.DeviceProvider;
54import org.onosproject.net.device.DeviceProviderRegistry;
55import org.onosproject.net.device.DeviceProviderService;
56import org.onosproject.net.device.DeviceService;
57import org.onosproject.net.device.PortDescription;
58import org.onosproject.net.device.PortStatistics;
59import org.onosproject.net.device.PortStatisticsDiscovery;
60import org.onosproject.net.driver.Behaviour;
61import org.onosproject.net.driver.DefaultDriverData;
62import org.onosproject.net.driver.DefaultDriverHandler;
63import org.onosproject.net.driver.Driver;
64import org.onosproject.net.driver.DriverData;
65import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040066import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020067import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040068import org.onosproject.net.pi.model.PiPipelineProgrammable;
Andrea Campanellabc112a92017-06-26 19:06:43 +020069import org.onosproject.net.pi.runtime.PiPipeconfConfig;
70import org.onosproject.net.pi.runtime.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070071import org.onosproject.net.provider.AbstractProvider;
72import org.onosproject.net.provider.ProviderId;
73import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020074import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070075import org.slf4j.Logger;
76
77import java.util.ArrayList;
78import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020079import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020080import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070081import java.util.List;
Andrea Campanella19090322017-08-22 10:31:37 +020082import java.util.Random;
Andrea Campanellabc112a92017-06-26 19:06:43 +020083import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070084import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020085import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020086import java.util.concurrent.ConcurrentMap;
87import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070088import java.util.concurrent.ExecutionException;
89import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020090import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070091import java.util.concurrent.TimeUnit;
92import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +020093import java.util.concurrent.locks.Lock;
94import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -070095
96import static java.util.concurrent.Executors.newScheduledThreadPool;
97import static org.onlab.util.Tools.groupedThreads;
98import static org.onosproject.net.device.DeviceEvent.Type;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Provider which uses drivers to detect device and do initial handshake
103 * and channel establishment with devices. Any other provider specific operation
104 * is also delegated to the DeviceHandshaker driver.
105 */
106@Beta
107@Component(immediate = true)
108public class GeneralDeviceProvider extends AbstractProvider
109 implements DeviceProvider {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200110 public static final String DRIVER = "driver";
Andrea Campanella241896c2017-05-10 13:11:04 -0700111 private final Logger log = getLogger(getClass());
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected DeviceProviderRegistry providerRegistry;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected NetworkConfigRegistry cfgService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected CoreService coreService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected DeviceService deviceService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected DriverService driverService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200129 protected PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700130
Andrea Campanella19090322017-08-22 10:31:37 +0200131 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
132 @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
133 label = "Configure poll frequency for port status and statistics; " +
134 "default is 10 sec")
135 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
136
Andrea Campanella241896c2017-05-10 13:11:04 -0700137 protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
138 protected static final String URI_SCHEME = "device";
139 protected static final String CFG_SCHEME = "generalprovider";
140 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
141 private static final int CORE_POOL_SIZE = 10;
142 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200143
Andrea Campanellabc112a92017-06-26 19:06:43 +0200144 //FIXME this will be removed when the configuration is synced at the source.
145 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200146
Andrea Campanella19090322017-08-22 10:31:37 +0200147 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200148 //FIXME to be removed when netcfg will issue device events in a bundle or
149 //ensures all configuration needed is present
150 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
151 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
152 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700153
154
155 protected ScheduledExecutorService connectionExecutor
156 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200157 groupedThreads("onos/generaldeviceprovider-device",
158 "connection-executor-%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700159 protected ScheduledExecutorService portStatsExecutor
160 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200161 groupedThreads("onos/generaldeviceprovider-port-stats",
162 "port-stats-executor-%d", log));
163 protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700164
165 protected DeviceProviderService providerService;
166 private InternalDeviceListener deviceListener = new InternalDeviceListener();
167
168 protected final ConfigFactory factory =
169 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
170 SubjectFactories.DEVICE_SUBJECT_FACTORY,
171 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
172 @Override
173 public GeneralProviderDeviceConfig createConfig() {
174 return new GeneralProviderDeviceConfig();
175 }
176 };
177
178 protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
179
180
181 @Activate
182 public void activate() {
183 providerService = providerRegistry.register(this);
184 coreService.registerApplication(APP_NAME);
185 cfgService.registerConfigFactory(factory);
186 cfgService.addListener(cfgListener);
187 deviceService.addListener(deviceListener);
188 //This will fail if ONOS has CFG and drivers which depend on this provider
189 // are activated, failing due to not finding the driver.
190 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
191 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
192 log.info("Started");
193 }
194
Andrea Campanella19090322017-08-22 10:31:37 +0200195 @Modified
196 public void modified(ComponentContext context) {
197 if (context != null) {
198 Dictionary<?, ?> properties = context.getProperties();
199 pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
200 DEFAULT_POLL_FREQUENCY_SECONDS);
201 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
202 }
203
204 if (!scheduledTasks.isEmpty()) {
205 //cancel all previous tasks
206 scheduledTasks.values().forEach(task -> task.cancel(false));
207 //resubmit task with new timeout.
208 Set<DeviceId> deviceSubjects =
209 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
210 deviceSubjects.forEach(deviceId -> {
211 if (!compareScheme(deviceId)) {
212 // not under my scheme, skipping
213 log.debug("{} is not my scheme, skipping", deviceId);
214 return;
215 }
216 scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
217 });
218 }
219
220 }
221
Andrea Campanella241896c2017-05-10 13:11:04 -0700222
223 @Deactivate
224 public void deactivate() {
225 portStatsExecutor.shutdown();
226 cfgService.removeListener(cfgListener);
227 //Not Removing the device so they can still be used from other driver providers
228 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
229 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
230 connectionExecutor.shutdown();
231 deviceService.removeListener(deviceListener);
232 providerRegistry.unregister(this);
233 providerService = null;
234 cfgService.unregisterConfigFactory(factory);
235 log.info("Stopped");
236 }
237
238 public GeneralDeviceProvider() {
239 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
240 }
241
242
243 @Override
244 public void triggerProbe(DeviceId deviceId) {
245 //TODO Really don't see the point of this in non OF Context,
246 // for now testing reachability, can be moved to no-op
247 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
248 isReachable(deviceId);
249 }
250
251 @Override
252 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
253 log.debug("Received role {} for device {}", newRole, deviceId);
254 CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
255 roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
256 }
257
258 @Override
259 public boolean isReachable(DeviceId deviceId) {
260 log.debug("Testing rechability for device {}", deviceId);
261 CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
262 try {
263 return reachable.get(10, TimeUnit.SECONDS);
264 } catch (InterruptedException | ExecutionException | TimeoutException e) {
265 log.error("Device {} is not reachable", deviceId, e);
266 return false;
267 }
268 }
269
270 @Override
271 public void changePortState(DeviceId deviceId, PortNumber portNumber,
272 boolean enable) {
273 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
274
275 PortAdmin portAdmin = getPortAdmin(deviceId);
276 CompletableFuture<Boolean> modified;
277 if (enable) {
278 modified = portAdmin.enable(portNumber);
279 } else {
280 modified = portAdmin.disable(portNumber);
281 }
282 modified.thenAcceptAsync(result -> {
283 if (!result) {
284 log.warn("Your device {} port {} status can't be changed to {}",
Andrea Campanella19090322017-08-22 10:31:37 +0200285 deviceId, portNumber, enable);
Andrea Campanella241896c2017-05-10 13:11:04 -0700286 }
287 });
288
289 } else {
290 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
291 }
292 }
293
294 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
295 Driver driver = getDriver(deviceId);
296 return getBehaviour(driver, DeviceHandshaker.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200297 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700298 }
299
300 private PortAdmin getPortAdmin(DeviceId deviceId) {
301 Driver driver = getDriver(deviceId);
302 return getBehaviour(driver, PortAdmin.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200303 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700304
305 }
306
307 private Driver getDriver(DeviceId deviceId) {
308 Driver driver;
309 try {
310 driver = driverService.getDriver(deviceId);
311 } catch (ItemNotFoundException e) {
312 log.debug("Falling back to configuration to fetch driver " +
Andrea Campanella19090322017-08-22 10:31:37 +0200313 "for device {}", deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700314 driver = driverService.getDriver(
315 cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
316 }
317 return driver;
318 }
319
320 //needed since the device manager will not return the driver through implementation()
321 // method since the device is not pushed to the core so for the connectDevice
322 // we need to work around that in order to test before calling
323 // store.createOrUpdateDevice
324 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
325 DriverData data) {
326 if (driver.hasBehaviour(type)) {
327 DefaultDriverHandler handler = new DefaultDriverHandler(data);
328 return driver.createBehaviour(handler, type);
329 } else {
330 return null;
331 }
332 }
333
334 //Connects a general device
335 private void connectDevice(DeviceId deviceId) {
336 //retrieve the configuration
337 GeneralProviderDeviceConfig providerConfig =
338 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
339 BasicDeviceConfig basicDeviceConfig =
340 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
341
342 if (providerConfig == null || basicDeviceConfig == null) {
343 log.error("Configuration is NULL: basic config {}, general provider " +
Andrea Campanella19090322017-08-22 10:31:37 +0200344 "config {}", basicDeviceConfig, providerConfig);
Andrea Campanella241896c2017-05-10 13:11:04 -0700345 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200346 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700347
Frank Wang554ce972017-09-06 09:56:43 +0800348 Driver driver;
349 try {
350 driver = driverService.getDriver(basicDeviceConfig.driver());
351 } catch (ItemNotFoundException e) {
352 log.warn("The driver of {} is not found : {}", deviceId, e.getMessage());
353 return;
354 }
355
Andrea Campanella241896c2017-05-10 13:11:04 -0700356 DriverData driverData = new DefaultDriverData(driver, deviceId);
Frank Wang554ce972017-09-06 09:56:43 +0800357 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200358 if (handshaker == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700359 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
Andrea Campanella19090322017-08-22 10:31:37 +0200360 "behaviour, {}", deviceId, driver.name(), driver.behaviours());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200361 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700362 }
Frank Wang554ce972017-09-06 09:56:43 +0800363
Andrea Campanellabc112a92017-06-26 19:06:43 +0200364 //Storing deviceKeyId and all other config values
365 // as data in the driver with protocol_<info>
366 // name as the key. e.g protocol_ip
367 providerConfig.protocolsInfo()
368 .forEach((protocol, deviceInfoConfig) -> {
369 deviceInfoConfig.configValues()
370 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
371 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
372 });
373
374 //Connecting to the device
375 CompletableFuture<Boolean> connected = handshaker.connect();
376
377 connected.thenAcceptAsync(result -> {
378 if (result) {
379
380 //Populated with the default values obtained by the driver
381 ChassisId cid = new ChassisId();
382 SparseAnnotations annotations = DefaultAnnotations.builder()
383 .set(AnnotationKeys.PROTOCOL,
Andrea Campanella19090322017-08-22 10:31:37 +0200384 providerConfig.protocolsInfo().keySet().toString())
Andrea Campanellabc112a92017-06-26 19:06:43 +0200385 .build();
386 DeviceDescription description =
387 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
Andrea Campanella19090322017-08-22 10:31:37 +0200388 driver.manufacturer(), driver.hwVersion(),
389 driver.swVersion(), UNKNOWN,
390 cid, false, annotations);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200391 //Empty list of ports
392 List<PortDescription> ports = new ArrayList<>();
393
394 if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
395 DeviceDescriptionDiscovery deviceDiscovery = driver
396 .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
397
398 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
399 if (newdescription != null) {
400 description = newdescription;
401 }
402 ports = deviceDiscovery.discoverPortDetails();
403 }
404
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400405 if (!handlePipeconf(deviceId, driver, driverData)) {
406 // Something went wrong during handling of pipeconf.
407 // We already logged the error.
408 handshaker.disconnect();
409 return;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200410 }
411
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400412 advertiseDevice(deviceId, description, ports);
413
Andrea Campanellabc112a92017-06-26 19:06:43 +0200414 } else {
415 log.warn("Can't connect to device {}", deviceId);
416 }
417 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700418 }
419 }
420
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400421 /**
422 * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
423 * device can be registered to the core, false otherwise.
424 */
425 private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
426
427 PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200428 driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400429
430 if (pipelineProg == null) {
431 // Device is not pipeline programmable.
432 return true;
433 }
434
435 PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
436 // No pipeconf has been associated with this device.
437 // Check if device driver provides a default one.
438 if (pipelineProg.getDefaultPipeconf().isPresent()) {
439 PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
440 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400441 return defaultPipeconf.id();
442 } else {
443 return null;
444 }
445 });
446
447 if (pipeconfId == null) {
448 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200449 return false;
450 }
451
452 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
453 log.warn("Pipeconf {} is not registered", pipeconfId);
454 return false;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400455 }
456
457
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200458 PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400459
460 try {
461 if (!pipelineProg.deployPipeconf(pipeconf).get()) {
462 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
463 return false;
464 }
465
466 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
467 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
Andrea Campanella19090322017-08-22 10:31:37 +0200468 driver.name(), deviceId, pipeconfId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400469 return false;
470 }
471 } catch (InterruptedException | ExecutionException e) {
472 throw new IllegalStateException(e);
473 }
474
475 return true;
476 }
477
Andrea Campanellabc112a92017-06-26 19:06:43 +0200478 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
479 providerService.deviceConnected(deviceId, description);
480 providerService.updatePorts(deviceId, ports);
481 }
482
Andrea Campanella241896c2017-05-10 13:11:04 -0700483 private void disconnectDevice(DeviceId deviceId) {
484 log.info("Disconnecting for device {}", deviceId);
485 DeviceHandshaker handshaker = getHandshaker(deviceId);
486 if (handshaker != null) {
487 CompletableFuture<Boolean> disconnect = handshaker.disconnect();
488
489 disconnect.thenAcceptAsync(result -> {
490 if (result) {
491 log.info("Disconnected device {}", deviceId);
492 providerService.deviceDisconnected(deviceId);
493 } else {
494 log.warn("Device {} was unable to disconnect", deviceId);
495 }
496 });
497 } else {
498 //gracefully ignoring.
499 log.info("No DeviceHandshaker for device {}", deviceId);
500 }
Andrea Campanella19090322017-08-22 10:31:37 +0200501 ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
502 if (pollingStatisticsTask != null) {
503 pollingStatisticsTask.cancel(true);
504 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700505 }
506
507 //Needed to catch the exception in the executors since are not rethrown otherwise.
508 private Runnable exceptionSafe(Runnable runnable) {
509 return () -> {
510 try {
511 runnable.run();
512 } catch (Exception e) {
513 log.error("Unhandled Exception", e);
514 }
515 };
516 }
517
518 private void updatePortStatistics(DeviceId deviceId) {
519 Collection<PortStatistics> statistics = deviceService.getDevice(deviceId)
520 .as(PortStatisticsDiscovery.class)
521 .discoverPortStatistics();
Andrea Campanella19090322017-08-22 10:31:37 +0200522 //updating statistcs only if not empty
523 if (!statistics.isEmpty()) {
524 providerService.updatePortStatistics(deviceId, statistics);
525 }
526 }
527
528 private boolean compareScheme(DeviceId deviceId) {
529 return deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700530 }
531
532 /**
533 * Listener for configuration events.
534 */
535 private class InternalNetworkConfigListener implements NetworkConfigListener {
536
537
538 @Override
539 public void event(NetworkConfigEvent event) {
540 DeviceId deviceId = (DeviceId) event.subject();
541 //Assuming that the deviceId comes with uri 'device:'
Andrea Campanella19090322017-08-22 10:31:37 +0200542 if (!compareScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700543 // not under my scheme, skipping
544 log.debug("{} is not my scheme, skipping", deviceId);
545 return;
546 }
Andrea Campanellabc112a92017-06-26 19:06:43 +0200547 if (deviceService.getDevice(deviceId) != null || deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700548 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200549 return;
550 }
551 //FIXME to be removed when netcfg will issue device events in a bundle or
552 // ensure all configuration needed is present
553 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
554 lock.lock();
555 try {
556 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
557 //FIXME we currently assume that p4runtime devices are pipeline configurable.
558 //If we want to connect a p4runtime device with no pipeline
559 if (event.config().isPresent() &&
560 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
Andrea Campanella19090322017-08-22 10:31:37 +0200561 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200562 pipelineConfigured.add(deviceId);
563 }
564 deviceConfigured.add(deviceId);
565 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
566 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
567 //TODO add check for pipeline and add it to the pipeline list if no
568 // p4runtime is present.
569 driverConfigured.add(deviceId);
570 }
571 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
572 if (event.config().isPresent()
573 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
574 pipelineConfigured.add(deviceId);
575 }
576 }
577 //if the device has no "pipeline configurable protocol it will be present
578 // in the pipelineConfigured
579 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
580 && pipelineConfigured.contains(deviceId)) {
581 checkAndSubmitDeviceTask(deviceId);
582 } else {
583 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
584 log.debug("Waiting for pipeline configuration for device {}", deviceId);
585 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
586 log.debug("Waiting for device configuration for device {}", deviceId);
587 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
588 log.debug("Waiting for driver configuration for device {}", deviceId);
589 } else if (driverConfigured.contains(deviceId)) {
590 log.debug("Only driver configuration for device {}", deviceId);
591 } else if (deviceConfigured.contains(deviceId)) {
592 log.debug("Only device configuration for device {}", deviceId);
593 }
594 }
595 } finally {
596 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700597 }
598 }
599
600 @Override
601 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200602 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
603 event.configClass().equals(BasicDeviceConfig.class) ||
604 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700605 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
606 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
607 }
608 }
609
Andrea Campanellabc112a92017-06-26 19:06:43 +0200610 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
611 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
612 //FIXME this will be removed when configuration is synced.
613 deviceConfigured.remove(deviceId);
614 driverConfigured.remove(deviceId);
615 pipelineConfigured.remove(deviceId);
616
617 }
618
Andrea Campanella19090322017-08-22 10:31:37 +0200619 private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
620 int delay = 0;
621 if (randomize) {
622 delay = new Random().nextInt(10);
623 }
624 return portStatsExecutor.scheduleAtFixedRate(
625 exceptionSafe(() -> updatePortStatistics(deviceId)),
626 delay, pollFrequency, TimeUnit.SECONDS);
627 }
628
Andrea Campanella241896c2017-05-10 13:11:04 -0700629 /**
630 * Listener for core device events.
631 */
632 private class InternalDeviceListener implements DeviceListener {
633 @Override
634 public void event(DeviceEvent event) {
635 Type type = event.type();
Andrea Campanella241896c2017-05-10 13:11:04 -0700636 if (type.equals((Type.DEVICE_ADDED))) {
637
638 //For now this is scheduled periodically, when streaming API will
639 // be available we check and base it on the streaming API (e.g. gNMI)
640 if (deviceService.getDevice(event.subject().id()).
641 is(PortStatisticsDiscovery.class)) {
Andrea Campanella19090322017-08-22 10:31:37 +0200642 scheduledTasks.put(event.subject().id(), schedulePolling(event.subject().id(), false));
Andrea Campanella241896c2017-05-10 13:11:04 -0700643 updatePortStatistics(event.subject().id());
644 }
645
646 } else if (type.equals(Type.DEVICE_REMOVED)) {
647 connectionExecutor.submit(exceptionSafe(() ->
Andrea Campanella19090322017-08-22 10:31:37 +0200648 disconnectDevice(event.subject().id())));
Andrea Campanella241896c2017-05-10 13:11:04 -0700649 }
650 }
651
652 @Override
653 public boolean isRelevant(DeviceEvent event) {
Andrea Campanella19090322017-08-22 10:31:37 +0200654 return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700655 }
656 }
657}