blob: 47ba6c42530c03ac1458762ca38ffb584a190f5c [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020032import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella241896c2017-05-10 13:11:04 -070033import org.onosproject.core.CoreService;
34import org.onosproject.net.AnnotationKeys;
35import org.onosproject.net.DefaultAnnotations;
36import org.onosproject.net.Device;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.MastershipRole;
39import org.onosproject.net.PortNumber;
40import org.onosproject.net.SparseAnnotations;
Carmelo Cascone87892e22017-11-13 16:01:29 -080041import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070042import org.onosproject.net.behaviour.PortAdmin;
43import org.onosproject.net.config.ConfigFactory;
44import org.onosproject.net.config.NetworkConfigEvent;
45import org.onosproject.net.config.NetworkConfigListener;
46import org.onosproject.net.config.NetworkConfigRegistry;
47import org.onosproject.net.config.basics.BasicDeviceConfig;
48import org.onosproject.net.config.basics.SubjectFactories;
49import org.onosproject.net.device.DefaultDeviceDescription;
50import org.onosproject.net.device.DeviceDescription;
51import org.onosproject.net.device.DeviceDescriptionDiscovery;
52import org.onosproject.net.device.DeviceEvent;
53import org.onosproject.net.device.DeviceHandshaker;
54import org.onosproject.net.device.DeviceListener;
55import org.onosproject.net.device.DeviceProvider;
56import org.onosproject.net.device.DeviceProviderRegistry;
57import org.onosproject.net.device.DeviceProviderService;
58import org.onosproject.net.device.DeviceService;
59import org.onosproject.net.device.PortDescription;
60import org.onosproject.net.device.PortStatistics;
61import org.onosproject.net.device.PortStatisticsDiscovery;
62import org.onosproject.net.driver.Behaviour;
63import org.onosproject.net.driver.DefaultDriverData;
64import org.onosproject.net.driver.DefaultDriverHandler;
65import org.onosproject.net.driver.Driver;
66import org.onosproject.net.driver.DriverData;
67import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040068import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020069import org.onosproject.net.pi.model.PiPipeconfId;
70import org.onosproject.net.pi.runtime.PiPipeconfConfig;
71import org.onosproject.net.pi.runtime.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070072import org.onosproject.net.provider.AbstractProvider;
73import org.onosproject.net.provider.ProviderId;
74import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020075import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070076import org.slf4j.Logger;
77
78import java.util.ArrayList;
79import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020080import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020081import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070082import java.util.List;
Andrea Campanellace111932017-09-18 16:59:56 +090083import java.util.Objects;
Andrea Campanella19090322017-08-22 10:31:37 +020084import java.util.Random;
Andrea Campanellabc112a92017-06-26 19:06:43 +020085import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070086import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020087import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020088import java.util.concurrent.ConcurrentMap;
89import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070090import java.util.concurrent.ExecutionException;
91import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020092import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070093import java.util.concurrent.TimeUnit;
94import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +020095import java.util.concurrent.locks.Lock;
96import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -070097
98import static java.util.concurrent.Executors.newScheduledThreadPool;
99import static org.onlab.util.Tools.groupedThreads;
100import static org.onosproject.net.device.DeviceEvent.Type;
101import static org.slf4j.LoggerFactory.getLogger;
102
103/**
104 * Provider which uses drivers to detect device and do initial handshake
105 * and channel establishment with devices. Any other provider specific operation
106 * is also delegated to the DeviceHandshaker driver.
107 */
108@Beta
109@Component(immediate = true)
110public class GeneralDeviceProvider extends AbstractProvider
111 implements DeviceProvider {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200112 public static final String DRIVER = "driver";
Andrea Campanella241896c2017-05-10 13:11:04 -0700113 private final Logger log = getLogger(getClass());
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected DeviceProviderRegistry providerRegistry;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella4929a812017-10-09 18:38:23 +0200119 protected ComponentConfigService componentConfigService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella241896c2017-05-10 13:11:04 -0700122 protected NetworkConfigRegistry cfgService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected CoreService coreService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected DeviceService deviceService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected DriverService driverService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200134 protected PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700135
Andrea Campanella19090322017-08-22 10:31:37 +0200136 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
137 @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
138 label = "Configure poll frequency for port status and statistics; " +
139 "default is 10 sec")
140 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
141
Andrea Campanella241896c2017-05-10 13:11:04 -0700142 protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
143 protected static final String URI_SCHEME = "device";
144 protected static final String CFG_SCHEME = "generalprovider";
145 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
146 private static final int CORE_POOL_SIZE = 10;
147 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200148
Andrea Campanellabc112a92017-06-26 19:06:43 +0200149 //FIXME this will be removed when the configuration is synced at the source.
150 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200151
Andrea Campanella19090322017-08-22 10:31:37 +0200152 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200153 //FIXME to be removed when netcfg will issue device events in a bundle or
154 //ensures all configuration needed is present
155 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
156 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
157 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700158
159
160 protected ScheduledExecutorService connectionExecutor
161 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200162 groupedThreads("onos/generaldeviceprovider-device",
163 "connection-executor-%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700164 protected ScheduledExecutorService portStatsExecutor
165 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200166 groupedThreads("onos/generaldeviceprovider-port-stats",
167 "port-stats-executor-%d", log));
168 protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700169
170 protected DeviceProviderService providerService;
171 private InternalDeviceListener deviceListener = new InternalDeviceListener();
172
173 protected final ConfigFactory factory =
174 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
175 SubjectFactories.DEVICE_SUBJECT_FACTORY,
176 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
177 @Override
178 public GeneralProviderDeviceConfig createConfig() {
179 return new GeneralProviderDeviceConfig();
180 }
181 };
182
183 protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
184
185
186 @Activate
187 public void activate() {
188 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200189 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700190 coreService.registerApplication(APP_NAME);
191 cfgService.registerConfigFactory(factory);
192 cfgService.addListener(cfgListener);
193 deviceService.addListener(deviceListener);
194 //This will fail if ONOS has CFG and drivers which depend on this provider
195 // are activated, failing due to not finding the driver.
196 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
197 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
198 log.info("Started");
199 }
200
Andrea Campanella19090322017-08-22 10:31:37 +0200201 @Modified
202 public void modified(ComponentContext context) {
203 if (context != null) {
204 Dictionary<?, ?> properties = context.getProperties();
205 pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
206 DEFAULT_POLL_FREQUENCY_SECONDS);
207 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
208 }
209
210 if (!scheduledTasks.isEmpty()) {
211 //cancel all previous tasks
212 scheduledTasks.values().forEach(task -> task.cancel(false));
213 //resubmit task with new timeout.
214 Set<DeviceId> deviceSubjects =
215 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
216 deviceSubjects.forEach(deviceId -> {
217 if (!compareScheme(deviceId)) {
218 // not under my scheme, skipping
219 log.debug("{} is not my scheme, skipping", deviceId);
220 return;
221 }
222 scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
223 });
224 }
225
226 }
227
Andrea Campanella241896c2017-05-10 13:11:04 -0700228
229 @Deactivate
230 public void deactivate() {
231 portStatsExecutor.shutdown();
Andrea Campanella4929a812017-10-09 18:38:23 +0200232 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700233 cfgService.removeListener(cfgListener);
234 //Not Removing the device so they can still be used from other driver providers
235 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
236 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
237 connectionExecutor.shutdown();
238 deviceService.removeListener(deviceListener);
239 providerRegistry.unregister(this);
240 providerService = null;
241 cfgService.unregisterConfigFactory(factory);
242 log.info("Stopped");
243 }
244
245 public GeneralDeviceProvider() {
246 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
247 }
248
249
250 @Override
251 public void triggerProbe(DeviceId deviceId) {
252 //TODO Really don't see the point of this in non OF Context,
253 // for now testing reachability, can be moved to no-op
254 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
255 isReachable(deviceId);
256 }
257
258 @Override
259 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
260 log.debug("Received role {} for device {}", newRole, deviceId);
261 CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
262 roleReply.thenAcceptAsync(mastership -> providerService.receivedRoleReply(deviceId, newRole, mastership));
263 }
264
265 @Override
266 public boolean isReachable(DeviceId deviceId) {
267 log.debug("Testing rechability for device {}", deviceId);
268 CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
269 try {
270 return reachable.get(10, TimeUnit.SECONDS);
271 } catch (InterruptedException | ExecutionException | TimeoutException e) {
272 log.error("Device {} is not reachable", deviceId, e);
273 return false;
274 }
275 }
276
277 @Override
278 public void changePortState(DeviceId deviceId, PortNumber portNumber,
279 boolean enable) {
280 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
281
282 PortAdmin portAdmin = getPortAdmin(deviceId);
283 CompletableFuture<Boolean> modified;
284 if (enable) {
285 modified = portAdmin.enable(portNumber);
286 } else {
287 modified = portAdmin.disable(portNumber);
288 }
289 modified.thenAcceptAsync(result -> {
290 if (!result) {
291 log.warn("Your device {} port {} status can't be changed to {}",
Andrea Campanella19090322017-08-22 10:31:37 +0200292 deviceId, portNumber, enable);
Andrea Campanella241896c2017-05-10 13:11:04 -0700293 }
294 });
295
296 } else {
297 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
298 }
299 }
300
301 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
302 Driver driver = getDriver(deviceId);
303 return getBehaviour(driver, DeviceHandshaker.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200304 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700305 }
306
307 private PortAdmin getPortAdmin(DeviceId deviceId) {
308 Driver driver = getDriver(deviceId);
309 return getBehaviour(driver, PortAdmin.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200310 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700311
312 }
313
314 private Driver getDriver(DeviceId deviceId) {
315 Driver driver;
316 try {
317 driver = driverService.getDriver(deviceId);
318 } catch (ItemNotFoundException e) {
319 log.debug("Falling back to configuration to fetch driver " +
Andrea Campanella19090322017-08-22 10:31:37 +0200320 "for device {}", deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700321 driver = driverService.getDriver(
322 cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
323 }
324 return driver;
325 }
326
327 //needed since the device manager will not return the driver through implementation()
328 // method since the device is not pushed to the core so for the connectDevice
329 // we need to work around that in order to test before calling
330 // store.createOrUpdateDevice
331 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
332 DriverData data) {
333 if (driver.hasBehaviour(type)) {
334 DefaultDriverHandler handler = new DefaultDriverHandler(data);
335 return driver.createBehaviour(handler, type);
336 } else {
337 return null;
338 }
339 }
340
341 //Connects a general device
342 private void connectDevice(DeviceId deviceId) {
343 //retrieve the configuration
344 GeneralProviderDeviceConfig providerConfig =
345 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
346 BasicDeviceConfig basicDeviceConfig =
347 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
348
349 if (providerConfig == null || basicDeviceConfig == null) {
350 log.error("Configuration is NULL: basic config {}, general provider " +
Andrea Campanella19090322017-08-22 10:31:37 +0200351 "config {}", basicDeviceConfig, providerConfig);
Andrea Campanella241896c2017-05-10 13:11:04 -0700352 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200353 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700354
Frank Wang554ce972017-09-06 09:56:43 +0800355 Driver driver;
356 try {
357 driver = driverService.getDriver(basicDeviceConfig.driver());
358 } catch (ItemNotFoundException e) {
359 log.warn("The driver of {} is not found : {}", deviceId, e.getMessage());
360 return;
361 }
362
Andrea Campanella241896c2017-05-10 13:11:04 -0700363 DriverData driverData = new DefaultDriverData(driver, deviceId);
Frank Wang554ce972017-09-06 09:56:43 +0800364 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200365 if (handshaker == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700366 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
Andrea Campanella19090322017-08-22 10:31:37 +0200367 "behaviour, {}", deviceId, driver.name(), driver.behaviours());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200368 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700369 }
Frank Wang554ce972017-09-06 09:56:43 +0800370
Andrea Campanellabc112a92017-06-26 19:06:43 +0200371 //Storing deviceKeyId and all other config values
372 // as data in the driver with protocol_<info>
373 // name as the key. e.g protocol_ip
374 providerConfig.protocolsInfo()
375 .forEach((protocol, deviceInfoConfig) -> {
376 deviceInfoConfig.configValues()
377 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
378 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
379 });
380
381 //Connecting to the device
382 CompletableFuture<Boolean> connected = handshaker.connect();
383
384 connected.thenAcceptAsync(result -> {
385 if (result) {
386
387 //Populated with the default values obtained by the driver
388 ChassisId cid = new ChassisId();
389 SparseAnnotations annotations = DefaultAnnotations.builder()
390 .set(AnnotationKeys.PROTOCOL,
Andrea Campanella19090322017-08-22 10:31:37 +0200391 providerConfig.protocolsInfo().keySet().toString())
Andrea Campanellabc112a92017-06-26 19:06:43 +0200392 .build();
393 DeviceDescription description =
394 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
Andrea Campanella19090322017-08-22 10:31:37 +0200395 driver.manufacturer(), driver.hwVersion(),
396 driver.swVersion(), UNKNOWN,
397 cid, false, annotations);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200398 //Empty list of ports
399 List<PortDescription> ports = new ArrayList<>();
400
401 if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
402 DeviceDescriptionDiscovery deviceDiscovery = driver
403 .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
404
405 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
406 if (newdescription != null) {
407 description = newdescription;
408 }
409 ports = deviceDiscovery.discoverPortDetails();
410 }
411
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400412 if (!handlePipeconf(deviceId, driver, driverData)) {
413 // Something went wrong during handling of pipeconf.
414 // We already logged the error.
415 handshaker.disconnect();
416 return;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200417 }
418
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400419 advertiseDevice(deviceId, description, ports);
420
Andrea Campanellabc112a92017-06-26 19:06:43 +0200421 } else {
422 log.warn("Can't connect to device {}", deviceId);
423 }
424 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700425 }
426 }
427
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400428 /**
429 * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
430 * device can be registered to the core, false otherwise.
431 */
432 private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData) {
433
434 PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200435 driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400436
437 if (pipelineProg == null) {
438 // Device is not pipeline programmable.
439 return true;
440 }
441
442 PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
443 // No pipeconf has been associated with this device.
444 // Check if device driver provides a default one.
445 if (pipelineProg.getDefaultPipeconf().isPresent()) {
446 PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
447 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400448 return defaultPipeconf.id();
449 } else {
450 return null;
451 }
452 });
453
454 if (pipeconfId == null) {
455 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200456 return false;
457 }
458
459 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
460 log.warn("Pipeconf {} is not registered", pipeconfId);
461 return false;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400462 }
463
464
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200465 PiPipeconf pipeconf = piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400466
467 try {
468 if (!pipelineProg.deployPipeconf(pipeconf).get()) {
469 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery", pipeconfId, deviceId);
470 return false;
471 }
472
473 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
474 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
Andrea Campanella19090322017-08-22 10:31:37 +0200475 driver.name(), deviceId, pipeconfId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400476 return false;
477 }
478 } catch (InterruptedException | ExecutionException e) {
479 throw new IllegalStateException(e);
480 }
481
482 return true;
483 }
484
Andrea Campanellabc112a92017-06-26 19:06:43 +0200485 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
486 providerService.deviceConnected(deviceId, description);
487 providerService.updatePorts(deviceId, ports);
488 }
489
Andrea Campanella241896c2017-05-10 13:11:04 -0700490 private void disconnectDevice(DeviceId deviceId) {
491 log.info("Disconnecting for device {}", deviceId);
492 DeviceHandshaker handshaker = getHandshaker(deviceId);
493 if (handshaker != null) {
494 CompletableFuture<Boolean> disconnect = handshaker.disconnect();
495
496 disconnect.thenAcceptAsync(result -> {
497 if (result) {
498 log.info("Disconnected device {}", deviceId);
499 providerService.deviceDisconnected(deviceId);
500 } else {
501 log.warn("Device {} was unable to disconnect", deviceId);
502 }
503 });
504 } else {
505 //gracefully ignoring.
506 log.info("No DeviceHandshaker for device {}", deviceId);
507 }
Andrea Campanella19090322017-08-22 10:31:37 +0200508 ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
509 if (pollingStatisticsTask != null) {
510 pollingStatisticsTask.cancel(true);
511 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700512 }
513
514 //Needed to catch the exception in the executors since are not rethrown otherwise.
515 private Runnable exceptionSafe(Runnable runnable) {
516 return () -> {
517 try {
518 runnable.run();
519 } catch (Exception e) {
520 log.error("Unhandled Exception", e);
521 }
522 };
523 }
524
525 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900526 Device device = deviceService.getDevice(deviceId);
527 if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
528 device.is(PortStatisticsDiscovery.class)) {
529 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
530 .discoverPortStatistics();
531 //updating statistcs only if not empty
532 if (!statistics.isEmpty()) {
533 providerService.updatePortStatistics(deviceId, statistics);
534 }
535 } else {
536 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200537 }
538 }
539
540 private boolean compareScheme(DeviceId deviceId) {
541 return deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700542 }
543
544 /**
545 * Listener for configuration events.
546 */
547 private class InternalNetworkConfigListener implements NetworkConfigListener {
548
549
550 @Override
551 public void event(NetworkConfigEvent event) {
552 DeviceId deviceId = (DeviceId) event.subject();
553 //Assuming that the deviceId comes with uri 'device:'
Andrea Campanella19090322017-08-22 10:31:37 +0200554 if (!compareScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700555 // not under my scheme, skipping
556 log.debug("{} is not my scheme, skipping", deviceId);
557 return;
558 }
Andrea Campanellace111932017-09-18 16:59:56 +0900559 if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700560 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200561 return;
562 }
563 //FIXME to be removed when netcfg will issue device events in a bundle or
564 // ensure all configuration needed is present
565 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
566 lock.lock();
567 try {
568 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
569 //FIXME we currently assume that p4runtime devices are pipeline configurable.
570 //If we want to connect a p4runtime device with no pipeline
571 if (event.config().isPresent() &&
572 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
Andrea Campanella19090322017-08-22 10:31:37 +0200573 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200574 pipelineConfigured.add(deviceId);
575 }
576 deviceConfigured.add(deviceId);
577 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
578 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
579 //TODO add check for pipeline and add it to the pipeline list if no
580 // p4runtime is present.
581 driverConfigured.add(deviceId);
582 }
583 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
584 if (event.config().isPresent()
585 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
586 pipelineConfigured.add(deviceId);
587 }
588 }
589 //if the device has no "pipeline configurable protocol it will be present
590 // in the pipelineConfigured
591 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
592 && pipelineConfigured.contains(deviceId)) {
593 checkAndSubmitDeviceTask(deviceId);
594 } else {
595 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
596 log.debug("Waiting for pipeline configuration for device {}", deviceId);
597 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
598 log.debug("Waiting for device configuration for device {}", deviceId);
599 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
600 log.debug("Waiting for driver configuration for device {}", deviceId);
601 } else if (driverConfigured.contains(deviceId)) {
602 log.debug("Only driver configuration for device {}", deviceId);
603 } else if (deviceConfigured.contains(deviceId)) {
604 log.debug("Only device configuration for device {}", deviceId);
605 }
606 }
607 } finally {
608 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700609 }
610 }
611
612 @Override
613 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200614 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
615 event.configClass().equals(BasicDeviceConfig.class) ||
616 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700617 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
618 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
619 }
620 }
621
Andrea Campanellabc112a92017-06-26 19:06:43 +0200622 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
623 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
624 //FIXME this will be removed when configuration is synced.
625 deviceConfigured.remove(deviceId);
626 driverConfigured.remove(deviceId);
627 pipelineConfigured.remove(deviceId);
628
629 }
630
Andrea Campanella19090322017-08-22 10:31:37 +0200631 private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
632 int delay = 0;
633 if (randomize) {
634 delay = new Random().nextInt(10);
635 }
636 return portStatsExecutor.scheduleAtFixedRate(
637 exceptionSafe(() -> updatePortStatistics(deviceId)),
638 delay, pollFrequency, TimeUnit.SECONDS);
639 }
640
Andrea Campanella241896c2017-05-10 13:11:04 -0700641 /**
642 * Listener for core device events.
643 */
644 private class InternalDeviceListener implements DeviceListener {
645 @Override
646 public void event(DeviceEvent event) {
647 Type type = event.type();
Andrea Campanellace111932017-09-18 16:59:56 +0900648 DeviceId deviceId = event.subject().id();
Andrea Campanella241896c2017-05-10 13:11:04 -0700649 if (type.equals((Type.DEVICE_ADDED))) {
650
651 //For now this is scheduled periodically, when streaming API will
652 // be available we check and base it on the streaming API (e.g. gNMI)
Andrea Campanellace111932017-09-18 16:59:56 +0900653 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
654 updatePortStatistics(deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700655
656 } else if (type.equals(Type.DEVICE_REMOVED)) {
657 connectionExecutor.submit(exceptionSafe(() ->
Andrea Campanellace111932017-09-18 16:59:56 +0900658 disconnectDevice(deviceId)));
Andrea Campanella241896c2017-05-10 13:11:04 -0700659 }
660 }
661
662 @Override
663 public boolean isRelevant(DeviceEvent event) {
Andrea Campanella19090322017-08-22 10:31:37 +0200664 return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700665 }
666 }
667}