blob: 8f6f4faf6fbc7618e3ddb7c29b671af7c6e62b1c [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020032import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020037import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070038import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.Device;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.MastershipRole;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.SparseAnnotations;
Carmelo Cascone87892e22017-11-13 16:01:29 -080045import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070046import org.onosproject.net.behaviour.PortAdmin;
47import org.onosproject.net.config.ConfigFactory;
48import org.onosproject.net.config.NetworkConfigEvent;
49import org.onosproject.net.config.NetworkConfigListener;
50import org.onosproject.net.config.NetworkConfigRegistry;
51import org.onosproject.net.config.basics.BasicDeviceConfig;
52import org.onosproject.net.config.basics.SubjectFactories;
53import org.onosproject.net.device.DefaultDeviceDescription;
54import org.onosproject.net.device.DeviceDescription;
55import org.onosproject.net.device.DeviceDescriptionDiscovery;
56import org.onosproject.net.device.DeviceEvent;
57import org.onosproject.net.device.DeviceHandshaker;
58import org.onosproject.net.device.DeviceListener;
59import org.onosproject.net.device.DeviceProvider;
60import org.onosproject.net.device.DeviceProviderRegistry;
61import org.onosproject.net.device.DeviceProviderService;
62import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.device.PortDescription;
64import org.onosproject.net.device.PortStatistics;
65import org.onosproject.net.device.PortStatisticsDiscovery;
66import org.onosproject.net.driver.Behaviour;
67import org.onosproject.net.driver.DefaultDriverData;
68import org.onosproject.net.driver.DefaultDriverHandler;
69import org.onosproject.net.driver.Driver;
70import org.onosproject.net.driver.DriverData;
71import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040072import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020073import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080074import org.onosproject.net.pi.service.PiPipeconfConfig;
75import org.onosproject.net.pi.service.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070076import org.onosproject.net.provider.AbstractProvider;
77import org.onosproject.net.provider.ProviderId;
78import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020079import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070080import org.slf4j.Logger;
81
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080082import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070083import java.util.ArrayList;
84import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020085import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020086import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070087import java.util.List;
Andrea Campanellace111932017-09-18 16:59:56 +090088import java.util.Objects;
Andrea Campanellabc112a92017-06-26 19:06:43 +020089import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070090import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020091import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020092import java.util.concurrent.ConcurrentMap;
93import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070094import java.util.concurrent.ExecutionException;
95import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020096import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070097import java.util.concurrent.TimeUnit;
98import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +020099import java.util.concurrent.locks.Lock;
100import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -0700101
102import static java.util.concurrent.Executors.newScheduledThreadPool;
103import static org.onlab.util.Tools.groupedThreads;
104import static org.onosproject.net.device.DeviceEvent.Type;
105import static org.slf4j.LoggerFactory.getLogger;
106
107/**
108 * Provider which uses drivers to detect device and do initial handshake
109 * and channel establishment with devices. Any other provider specific operation
110 * is also delegated to the DeviceHandshaker driver.
111 */
112@Beta
113@Component(immediate = true)
114public class GeneralDeviceProvider extends AbstractProvider
115 implements DeviceProvider {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200116 public static final String DRIVER = "driver";
Andrea Campanella14e196d2017-07-24 18:11:36 +0200117 public static final int REACHABILITY_TIMEOUT = 10;
118 public static final String DEPLOY = "deploy-";
119 public static final String PIPECONF_TOPIC = "-pipeconf";
120
Andrea Campanella241896c2017-05-10 13:11:04 -0700121 private final Logger log = getLogger(getClass());
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected DeviceProviderRegistry providerRegistry;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella4929a812017-10-09 18:38:23 +0200127 protected ComponentConfigService componentConfigService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella241896c2017-05-10 13:11:04 -0700130 protected NetworkConfigRegistry cfgService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected CoreService coreService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected DeviceService deviceService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected DriverService driverService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella14e196d2017-07-24 18:11:36 +0200142 protected MastershipService mastershipService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200145 protected PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700146
Andrea Campanella14e196d2017-07-24 18:11:36 +0200147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ClusterService clusterService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected LeadershipService leadershipService;
152
Andrea Campanella19090322017-08-22 10:31:37 +0200153 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
154 @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
155 label = "Configure poll frequency for port status and statistics; " +
156 "default is 10 sec")
157 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
158
Andrea Campanella241896c2017-05-10 13:11:04 -0700159 protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
160 protected static final String URI_SCHEME = "device";
161 protected static final String CFG_SCHEME = "generalprovider";
162 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
163 private static final int CORE_POOL_SIZE = 10;
164 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200165
Andrea Campanellabc112a92017-06-26 19:06:43 +0200166 //FIXME this will be removed when the configuration is synced at the source.
167 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200168
Andrea Campanella19090322017-08-22 10:31:37 +0200169 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200170 //FIXME to be removed when netcfg will issue device events in a bundle or
171 //ensures all configuration needed is present
172 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
173 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
174 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700175
176
177 protected ScheduledExecutorService connectionExecutor
178 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200179 groupedThreads("onos/generaldeviceprovider-device",
180 "connection-executor-%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700181 protected ScheduledExecutorService portStatsExecutor
182 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200183 groupedThreads("onos/generaldeviceprovider-port-stats",
184 "port-stats-executor-%d", log));
185 protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700186
187 protected DeviceProviderService providerService;
188 private InternalDeviceListener deviceListener = new InternalDeviceListener();
189
190 protected final ConfigFactory factory =
191 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
192 SubjectFactories.DEVICE_SUBJECT_FACTORY,
193 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
194 @Override
195 public GeneralProviderDeviceConfig createConfig() {
196 return new GeneralProviderDeviceConfig();
197 }
198 };
199
200 protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
201
202
203 @Activate
204 public void activate() {
205 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200206 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700207 coreService.registerApplication(APP_NAME);
208 cfgService.registerConfigFactory(factory);
209 cfgService.addListener(cfgListener);
210 deviceService.addListener(deviceListener);
211 //This will fail if ONOS has CFG and drivers which depend on this provider
212 // are activated, failing due to not finding the driver.
213 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
214 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
215 log.info("Started");
216 }
217
Andrea Campanella19090322017-08-22 10:31:37 +0200218 @Modified
219 public void modified(ComponentContext context) {
220 if (context != null) {
221 Dictionary<?, ?> properties = context.getProperties();
222 pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
223 DEFAULT_POLL_FREQUENCY_SECONDS);
224 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
225 }
226
227 if (!scheduledTasks.isEmpty()) {
228 //cancel all previous tasks
229 scheduledTasks.values().forEach(task -> task.cancel(false));
230 //resubmit task with new timeout.
231 Set<DeviceId> deviceSubjects =
232 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
233 deviceSubjects.forEach(deviceId -> {
234 if (!compareScheme(deviceId)) {
235 // not under my scheme, skipping
236 log.debug("{} is not my scheme, skipping", deviceId);
237 return;
238 }
239 scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
240 });
241 }
242
243 }
244
Andrea Campanella241896c2017-05-10 13:11:04 -0700245
246 @Deactivate
247 public void deactivate() {
248 portStatsExecutor.shutdown();
Andrea Campanella4929a812017-10-09 18:38:23 +0200249 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700250 cfgService.removeListener(cfgListener);
251 //Not Removing the device so they can still be used from other driver providers
252 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
253 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
254 connectionExecutor.shutdown();
255 deviceService.removeListener(deviceListener);
256 providerRegistry.unregister(this);
257 providerService = null;
258 cfgService.unregisterConfigFactory(factory);
259 log.info("Stopped");
260 }
261
262 public GeneralDeviceProvider() {
263 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
264 }
265
266
267 @Override
268 public void triggerProbe(DeviceId deviceId) {
269 //TODO Really don't see the point of this in non OF Context,
270 // for now testing reachability, can be moved to no-op
271 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
272 isReachable(deviceId);
273 }
274
275 @Override
276 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200277 log.info("Received role {} for device {}", newRole, deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700278 CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200279 roleReply.thenAcceptAsync(mastership -> {
280 providerService.receivedRoleReply(deviceId, newRole, mastership);
281 if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
282 scheduledTasks.get(deviceId).cancel(false);
283 scheduledTasks.remove(deviceId);
284 } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
285 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
286 updatePortStatistics(deviceId);
287 }
288 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700289 }
290
291 @Override
292 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200293 log.debug("Testing reachability for device {}", deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700294 CompletableFuture<Boolean> reachable = getHandshaker(deviceId).isReachable();
295 try {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200296 return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
Andrea Campanella241896c2017-05-10 13:11:04 -0700297 } catch (InterruptedException | ExecutionException | TimeoutException e) {
298 log.error("Device {} is not reachable", deviceId, e);
299 return false;
300 }
301 }
302
303 @Override
304 public void changePortState(DeviceId deviceId, PortNumber portNumber,
305 boolean enable) {
306 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
307
308 PortAdmin portAdmin = getPortAdmin(deviceId);
309 CompletableFuture<Boolean> modified;
310 if (enable) {
311 modified = portAdmin.enable(portNumber);
312 } else {
313 modified = portAdmin.disable(portNumber);
314 }
315 modified.thenAcceptAsync(result -> {
316 if (!result) {
317 log.warn("Your device {} port {} status can't be changed to {}",
Andrea Campanella19090322017-08-22 10:31:37 +0200318 deviceId, portNumber, enable);
Andrea Campanella241896c2017-05-10 13:11:04 -0700319 }
320 });
321
322 } else {
323 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
324 }
325 }
326
327 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
328 Driver driver = getDriver(deviceId);
329 return getBehaviour(driver, DeviceHandshaker.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200330 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700331 }
332
333 private PortAdmin getPortAdmin(DeviceId deviceId) {
334 Driver driver = getDriver(deviceId);
335 return getBehaviour(driver, PortAdmin.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200336 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700337
338 }
339
340 private Driver getDriver(DeviceId deviceId) {
341 Driver driver;
342 try {
343 driver = driverService.getDriver(deviceId);
344 } catch (ItemNotFoundException e) {
345 log.debug("Falling back to configuration to fetch driver " +
Andrea Campanella19090322017-08-22 10:31:37 +0200346 "for device {}", deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700347 driver = driverService.getDriver(
348 cfgService.getConfig(deviceId, BasicDeviceConfig.class).driver());
349 }
350 return driver;
351 }
352
353 //needed since the device manager will not return the driver through implementation()
354 // method since the device is not pushed to the core so for the connectDevice
355 // we need to work around that in order to test before calling
356 // store.createOrUpdateDevice
357 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
358 DriverData data) {
359 if (driver.hasBehaviour(type)) {
360 DefaultDriverHandler handler = new DefaultDriverHandler(data);
361 return driver.createBehaviour(handler, type);
362 } else {
363 return null;
364 }
365 }
366
367 //Connects a general device
368 private void connectDevice(DeviceId deviceId) {
369 //retrieve the configuration
370 GeneralProviderDeviceConfig providerConfig =
371 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
372 BasicDeviceConfig basicDeviceConfig =
373 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
374
375 if (providerConfig == null || basicDeviceConfig == null) {
376 log.error("Configuration is NULL: basic config {}, general provider " +
Andrea Campanella19090322017-08-22 10:31:37 +0200377 "config {}", basicDeviceConfig, providerConfig);
Andrea Campanella241896c2017-05-10 13:11:04 -0700378 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200379 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700380
Frank Wang554ce972017-09-06 09:56:43 +0800381 Driver driver;
382 try {
383 driver = driverService.getDriver(basicDeviceConfig.driver());
384 } catch (ItemNotFoundException e) {
385 log.warn("The driver of {} is not found : {}", deviceId, e.getMessage());
386 return;
387 }
388
Andrea Campanella241896c2017-05-10 13:11:04 -0700389 DriverData driverData = new DefaultDriverData(driver, deviceId);
Frank Wang554ce972017-09-06 09:56:43 +0800390 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200391 if (handshaker == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700392 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
Andrea Campanella19090322017-08-22 10:31:37 +0200393 "behaviour, {}", deviceId, driver.name(), driver.behaviours());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200394 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700395 }
Frank Wang554ce972017-09-06 09:56:43 +0800396
Andrea Campanella14e196d2017-07-24 18:11:36 +0200397 addConfigData(providerConfig, driverData);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200398
399 //Connecting to the device
400 CompletableFuture<Boolean> connected = handshaker.connect();
401
402 connected.thenAcceptAsync(result -> {
403 if (result) {
404
405 //Populated with the default values obtained by the driver
406 ChassisId cid = new ChassisId();
407 SparseAnnotations annotations = DefaultAnnotations.builder()
408 .set(AnnotationKeys.PROTOCOL,
Andrea Campanella19090322017-08-22 10:31:37 +0200409 providerConfig.protocolsInfo().keySet().toString())
Andrea Campanellabc112a92017-06-26 19:06:43 +0200410 .build();
411 DeviceDescription description =
412 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
Andrea Campanella19090322017-08-22 10:31:37 +0200413 driver.manufacturer(), driver.hwVersion(),
414 driver.swVersion(), UNKNOWN,
Yi Tseng92494fb2017-12-05 15:14:53 -0800415 cid, true, annotations);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200416 //Empty list of ports
417 List<PortDescription> ports = new ArrayList<>();
418
419 if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
420 DeviceDescriptionDiscovery deviceDiscovery = driver
421 .createBehaviour(driverData, DeviceDescriptionDiscovery.class);
422
423 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
424 if (newdescription != null) {
425 description = newdescription;
426 }
427 ports = deviceDiscovery.discoverPortDetails();
428 }
429
Andrea Campanella14e196d2017-07-24 18:11:36 +0200430 if (!handlePipeconf(deviceId, driver, driverData, true)) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400431 // Something went wrong during handling of pipeconf.
432 // We already logged the error.
433 handshaker.disconnect();
434 return;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200435 }
436
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400437 advertiseDevice(deviceId, description, ports);
438
Andrea Campanellabc112a92017-06-26 19:06:43 +0200439 } else {
440 log.warn("Can't connect to device {}", deviceId);
441 }
442 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700443 }
444 }
445
Andrea Campanella14e196d2017-07-24 18:11:36 +0200446 private void connectStandbyDevice(DeviceId deviceId) {
447
448 //if device is pipeline programmable we merge pipeconf + base driver for every other role
449 GeneralProviderDeviceConfig providerConfig =
450 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
451
452 Driver driver = getDriver(deviceId);
453
454
455 DriverData driverData = new DefaultDriverData(driver, deviceId);
456 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
457 if (handshaker == null) {
458 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
459 "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
460 return;
461 }
462 addConfigData(providerConfig, driverData);
463
464 //Connecting to the device
465 handshaker.connect().thenAcceptAsync(result -> {
466 if (result) {
467 handlePipeconf(deviceId, driver, driverData, false);
468 }
469 });
470 }
471
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400472 /**
473 * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
474 * device can be registered to the core, false otherwise.
475 */
Andrea Campanella14e196d2017-07-24 18:11:36 +0200476 private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400477
478 PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200479 driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400480
481 if (pipelineProg == null) {
482 // Device is not pipeline programmable.
483 return true;
484 }
485
Andrea Campanella14e196d2017-07-24 18:11:36 +0200486 PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
487
488 if (pipeconf != null) {
489
490 PiPipeconfId pipeconfId = pipeconf.id();
491
492 try {
493 if (deployPipeconf) {
494 if (!pipelineProg.deployPipeconf(pipeconf).get()) {
495 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
496 pipeconfId, deviceId);
497 return false;
498 }
499 }
500 } catch (InterruptedException | ExecutionException e) {
501 log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
502 return false;
503 }
504 try {
505 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
506 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
507 driver.name(), deviceId, pipeconfId);
508 return false;
509 }
510 } catch (InterruptedException | ExecutionException e) {
511 log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
512 return false;
513 }
514 } else {
515 return false;
516 }
517
518 return true;
519 }
520
521 private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400522 PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
523 // No pipeconf has been associated with this device.
524 // Check if device driver provides a default one.
525 if (pipelineProg.getDefaultPipeconf().isPresent()) {
526 PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
527 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400528 return defaultPipeconf.id();
529 } else {
530 return null;
531 }
532 });
533
534 if (pipeconfId == null) {
535 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200536 return null;
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200537 }
538
539 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
540 log.warn("Pipeconf {} is not registered", pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200541 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400542 }
543
544
Andrea Campanella14e196d2017-07-24 18:11:36 +0200545 return piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400546 }
547
Andrea Campanellabc112a92017-06-26 19:06:43 +0200548 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
549 providerService.deviceConnected(deviceId, description);
550 providerService.updatePorts(deviceId, ports);
551 }
552
Andrea Campanella241896c2017-05-10 13:11:04 -0700553 private void disconnectDevice(DeviceId deviceId) {
554 log.info("Disconnecting for device {}", deviceId);
555 DeviceHandshaker handshaker = getHandshaker(deviceId);
556 if (handshaker != null) {
557 CompletableFuture<Boolean> disconnect = handshaker.disconnect();
Andrea Campanella241896c2017-05-10 13:11:04 -0700558 disconnect.thenAcceptAsync(result -> {
559 if (result) {
560 log.info("Disconnected device {}", deviceId);
561 providerService.deviceDisconnected(deviceId);
562 } else {
563 log.warn("Device {} was unable to disconnect", deviceId);
564 }
565 });
566 } else {
567 //gracefully ignoring.
568 log.info("No DeviceHandshaker for device {}", deviceId);
569 }
Andrea Campanella19090322017-08-22 10:31:37 +0200570 ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
571 if (pollingStatisticsTask != null) {
572 pollingStatisticsTask.cancel(true);
573 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700574 }
575
576 //Needed to catch the exception in the executors since are not rethrown otherwise.
577 private Runnable exceptionSafe(Runnable runnable) {
578 return () -> {
579 try {
580 runnable.run();
581 } catch (Exception e) {
582 log.error("Unhandled Exception", e);
583 }
584 };
585 }
586
587 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900588 Device device = deviceService.getDevice(deviceId);
589 if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
590 device.is(PortStatisticsDiscovery.class)) {
591 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
592 .discoverPortStatistics();
593 //updating statistcs only if not empty
594 if (!statistics.isEmpty()) {
595 providerService.updatePortStatistics(deviceId, statistics);
596 }
597 } else {
598 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200599 }
600 }
601
602 private boolean compareScheme(DeviceId deviceId) {
603 return deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700604 }
605
606 /**
607 * Listener for configuration events.
608 */
609 private class InternalNetworkConfigListener implements NetworkConfigListener {
610
611
612 @Override
613 public void event(NetworkConfigEvent event) {
614 DeviceId deviceId = (DeviceId) event.subject();
615 //Assuming that the deviceId comes with uri 'device:'
Andrea Campanella19090322017-08-22 10:31:37 +0200616 if (!compareScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700617 // not under my scheme, skipping
618 log.debug("{} is not my scheme, skipping", deviceId);
619 return;
620 }
Andrea Campanellace111932017-09-18 16:59:56 +0900621 if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700622 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200623 return;
624 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200625 NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
626 .leader().nodeId();
627 NodeId localNodeId = clusterService.getLocalNode().id();
628 if (localNodeId.equals(leaderNodeId)) {
629 if (processEvent(event, deviceId)) {
630 log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
631 deviceId);
632 checkAndSubmitDeviceTask(deviceId);
633 }
634 } else {
635 if (processEvent(event, deviceId)) {
636 log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
637 localNodeId, deviceId, leaderNodeId);
638 connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
639 //FIXME this will be removed when config is synced
640 cleanUpConfigInfo(deviceId);
641 }
642 }
643
644 }
645
646 private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200647 //FIXME to be removed when netcfg will issue device events in a bundle or
648 // ensure all configuration needed is present
649 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
650 lock.lock();
651 try {
652 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
653 //FIXME we currently assume that p4runtime devices are pipeline configurable.
654 //If we want to connect a p4runtime device with no pipeline
655 if (event.config().isPresent() &&
656 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
Andrea Campanella19090322017-08-22 10:31:37 +0200657 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200658 pipelineConfigured.add(deviceId);
659 }
660 deviceConfigured.add(deviceId);
661 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
662 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
663 //TODO add check for pipeline and add it to the pipeline list if no
664 // p4runtime is present.
665 driverConfigured.add(deviceId);
666 }
667 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
668 if (event.config().isPresent()
669 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
670 pipelineConfigured.add(deviceId);
671 }
672 }
673 //if the device has no "pipeline configurable protocol it will be present
674 // in the pipelineConfigured
675 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
676 && pipelineConfigured.contains(deviceId)) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200677 return true;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200678 } else {
679 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
680 log.debug("Waiting for pipeline configuration for device {}", deviceId);
681 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
682 log.debug("Waiting for device configuration for device {}", deviceId);
683 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
684 log.debug("Waiting for driver configuration for device {}", deviceId);
685 } else if (driverConfigured.contains(deviceId)) {
686 log.debug("Only driver configuration for device {}", deviceId);
687 } else if (deviceConfigured.contains(deviceId)) {
688 log.debug("Only device configuration for device {}", deviceId);
689 }
690 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200691 return false;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200692 } finally {
693 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700694 }
695 }
696
697 @Override
698 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200699 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
700 event.configClass().equals(BasicDeviceConfig.class) ||
701 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700702 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
703 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
704 }
705 }
706
Andrea Campanellabc112a92017-06-26 19:06:43 +0200707 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
708 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
709 //FIXME this will be removed when configuration is synced.
Andrea Campanella14e196d2017-07-24 18:11:36 +0200710 cleanUpConfigInfo(deviceId);
711
712 }
713
714 private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
715 //Storing deviceKeyId and all other config values
716 // as data in the driver with protocol_<info>
717 // name as the key. e.g protocol_ip
718 providerConfig.protocolsInfo()
719 .forEach((protocol, deviceInfoConfig) -> {
720 deviceInfoConfig.configValues()
721 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
722 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
723 });
724 }
725
726 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200727 deviceConfigured.remove(deviceId);
728 driverConfigured.remove(deviceId);
729 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200730 }
731
Andrea Campanella19090322017-08-22 10:31:37 +0200732 private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
733 int delay = 0;
734 if (randomize) {
Ray Milkeyb68bbbc2017-12-18 10:05:49 -0800735 delay = new SecureRandom().nextInt(10);
Andrea Campanella19090322017-08-22 10:31:37 +0200736 }
737 return portStatsExecutor.scheduleAtFixedRate(
738 exceptionSafe(() -> updatePortStatistics(deviceId)),
739 delay, pollFrequency, TimeUnit.SECONDS);
740 }
741
Andrea Campanella241896c2017-05-10 13:11:04 -0700742 /**
743 * Listener for core device events.
744 */
745 private class InternalDeviceListener implements DeviceListener {
746 @Override
747 public void event(DeviceEvent event) {
748 Type type = event.type();
Andrea Campanellace111932017-09-18 16:59:56 +0900749 DeviceId deviceId = event.subject().id();
Andrea Campanella241896c2017-05-10 13:11:04 -0700750 if (type.equals((Type.DEVICE_ADDED))) {
751
752 //For now this is scheduled periodically, when streaming API will
753 // be available we check and base it on the streaming API (e.g. gNMI)
Andrea Campanella14e196d2017-07-24 18:11:36 +0200754 if (mastershipService.isLocalMaster(deviceId)) {
755 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
756 updatePortStatistics(deviceId);
757 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700758
759 } else if (type.equals(Type.DEVICE_REMOVED)) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200760
Andrea Campanella241896c2017-05-10 13:11:04 -0700761 connectionExecutor.submit(exceptionSafe(() ->
Andrea Campanellace111932017-09-18 16:59:56 +0900762 disconnectDevice(deviceId)));
Andrea Campanella241896c2017-05-10 13:11:04 -0700763 }
764 }
765
766 @Override
767 public boolean isRelevant(DeviceEvent event) {
Andrea Campanella19090322017-08-22 10:31:37 +0200768 return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700769 }
770 }
771}