blob: c4390946ae28f0dd2af7324ab13bf8173f1e2202 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020032import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020037import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070038import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.Device;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.MastershipRole;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.SparseAnnotations;
Carmelo Cascone87892e22017-11-13 16:01:29 -080045import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070046import org.onosproject.net.behaviour.PortAdmin;
47import org.onosproject.net.config.ConfigFactory;
48import org.onosproject.net.config.NetworkConfigEvent;
49import org.onosproject.net.config.NetworkConfigListener;
50import org.onosproject.net.config.NetworkConfigRegistry;
51import org.onosproject.net.config.basics.BasicDeviceConfig;
52import org.onosproject.net.config.basics.SubjectFactories;
53import org.onosproject.net.device.DefaultDeviceDescription;
54import org.onosproject.net.device.DeviceDescription;
55import org.onosproject.net.device.DeviceDescriptionDiscovery;
56import org.onosproject.net.device.DeviceEvent;
57import org.onosproject.net.device.DeviceHandshaker;
58import org.onosproject.net.device.DeviceListener;
59import org.onosproject.net.device.DeviceProvider;
60import org.onosproject.net.device.DeviceProviderRegistry;
61import org.onosproject.net.device.DeviceProviderService;
62import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.device.PortDescription;
64import org.onosproject.net.device.PortStatistics;
65import org.onosproject.net.device.PortStatisticsDiscovery;
66import org.onosproject.net.driver.Behaviour;
67import org.onosproject.net.driver.DefaultDriverData;
68import org.onosproject.net.driver.DefaultDriverHandler;
69import org.onosproject.net.driver.Driver;
70import org.onosproject.net.driver.DriverData;
71import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040072import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020073import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080074import org.onosproject.net.pi.service.PiPipeconfConfig;
75import org.onosproject.net.pi.service.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070076import org.onosproject.net.provider.AbstractProvider;
77import org.onosproject.net.provider.ProviderId;
78import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020079import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070080import org.slf4j.Logger;
81
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080082import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070083import java.util.ArrayList;
84import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020085import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020086import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070087import java.util.List;
Andrea Campanellace111932017-09-18 16:59:56 +090088import java.util.Objects;
Andrea Campanellabc112a92017-06-26 19:06:43 +020089import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070090import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020091import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020092import java.util.concurrent.ConcurrentMap;
93import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070094import java.util.concurrent.ExecutionException;
95import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020096import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070097import java.util.concurrent.TimeUnit;
98import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +020099import java.util.concurrent.locks.Lock;
100import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -0700101
102import static java.util.concurrent.Executors.newScheduledThreadPool;
103import static org.onlab.util.Tools.groupedThreads;
104import static org.onosproject.net.device.DeviceEvent.Type;
105import static org.slf4j.LoggerFactory.getLogger;
106
107/**
108 * Provider which uses drivers to detect device and do initial handshake
109 * and channel establishment with devices. Any other provider specific operation
110 * is also delegated to the DeviceHandshaker driver.
111 */
112@Beta
113@Component(immediate = true)
114public class GeneralDeviceProvider extends AbstractProvider
115 implements DeviceProvider {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200116 public static final String DRIVER = "driver";
Andrea Campanella14e196d2017-07-24 18:11:36 +0200117 public static final int REACHABILITY_TIMEOUT = 10;
118 public static final String DEPLOY = "deploy-";
119 public static final String PIPECONF_TOPIC = "-pipeconf";
120
Andrea Campanella241896c2017-05-10 13:11:04 -0700121 private final Logger log = getLogger(getClass());
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected DeviceProviderRegistry providerRegistry;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella4929a812017-10-09 18:38:23 +0200127 protected ComponentConfigService componentConfigService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella241896c2017-05-10 13:11:04 -0700130 protected NetworkConfigRegistry cfgService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected CoreService coreService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected DeviceService deviceService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected DriverService driverService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella14e196d2017-07-24 18:11:36 +0200142 protected MastershipService mastershipService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200145 protected PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700146
Andrea Campanella14e196d2017-07-24 18:11:36 +0200147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ClusterService clusterService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
151 protected LeadershipService leadershipService;
152
Andrea Campanella19090322017-08-22 10:31:37 +0200153 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
154 @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
155 label = "Configure poll frequency for port status and statistics; " +
156 "default is 10 sec")
157 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
158
Andrea Campanella241896c2017-05-10 13:11:04 -0700159 protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
160 protected static final String URI_SCHEME = "device";
161 protected static final String CFG_SCHEME = "generalprovider";
162 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
163 private static final int CORE_POOL_SIZE = 10;
164 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200165
Andrea Campanellabc112a92017-06-26 19:06:43 +0200166 //FIXME this will be removed when the configuration is synced at the source.
167 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200168
Andrea Campanella19090322017-08-22 10:31:37 +0200169 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200170 //FIXME to be removed when netcfg will issue device events in a bundle or
171 //ensures all configuration needed is present
172 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
173 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
174 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700175
176
177 protected ScheduledExecutorService connectionExecutor
178 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200179 groupedThreads("onos/generaldeviceprovider-device",
180 "connection-executor-%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700181 protected ScheduledExecutorService portStatsExecutor
182 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200183 groupedThreads("onos/generaldeviceprovider-port-stats",
184 "port-stats-executor-%d", log));
185 protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700186
187 protected DeviceProviderService providerService;
188 private InternalDeviceListener deviceListener = new InternalDeviceListener();
189
190 protected final ConfigFactory factory =
191 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
192 SubjectFactories.DEVICE_SUBJECT_FACTORY,
193 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
194 @Override
195 public GeneralProviderDeviceConfig createConfig() {
196 return new GeneralProviderDeviceConfig();
197 }
198 };
199
200 protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
201
202
203 @Activate
204 public void activate() {
205 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200206 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700207 coreService.registerApplication(APP_NAME);
208 cfgService.registerConfigFactory(factory);
209 cfgService.addListener(cfgListener);
210 deviceService.addListener(deviceListener);
211 //This will fail if ONOS has CFG and drivers which depend on this provider
212 // are activated, failing due to not finding the driver.
213 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
214 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
215 log.info("Started");
216 }
217
Andrea Campanella19090322017-08-22 10:31:37 +0200218 @Modified
219 public void modified(ComponentContext context) {
220 if (context != null) {
221 Dictionary<?, ?> properties = context.getProperties();
222 pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
223 DEFAULT_POLL_FREQUENCY_SECONDS);
224 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
225 }
226
227 if (!scheduledTasks.isEmpty()) {
228 //cancel all previous tasks
229 scheduledTasks.values().forEach(task -> task.cancel(false));
230 //resubmit task with new timeout.
231 Set<DeviceId> deviceSubjects =
232 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
233 deviceSubjects.forEach(deviceId -> {
234 if (!compareScheme(deviceId)) {
235 // not under my scheme, skipping
236 log.debug("{} is not my scheme, skipping", deviceId);
237 return;
238 }
239 scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
240 });
241 }
242
243 }
244
Andrea Campanella241896c2017-05-10 13:11:04 -0700245
246 @Deactivate
247 public void deactivate() {
248 portStatsExecutor.shutdown();
Andrea Campanella4929a812017-10-09 18:38:23 +0200249 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700250 cfgService.removeListener(cfgListener);
251 //Not Removing the device so they can still be used from other driver providers
252 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
253 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
254 connectionExecutor.shutdown();
255 deviceService.removeListener(deviceListener);
256 providerRegistry.unregister(this);
257 providerService = null;
258 cfgService.unregisterConfigFactory(factory);
259 log.info("Stopped");
260 }
261
262 public GeneralDeviceProvider() {
263 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
264 }
265
266
267 @Override
268 public void triggerProbe(DeviceId deviceId) {
269 //TODO Really don't see the point of this in non OF Context,
270 // for now testing reachability, can be moved to no-op
271 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
272 isReachable(deviceId);
273 }
274
275 @Override
276 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200277 log.info("Received role {} for device {}", newRole, deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700278 CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200279 roleReply.thenAcceptAsync(mastership -> {
280 providerService.receivedRoleReply(deviceId, newRole, mastership);
281 if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
282 scheduledTasks.get(deviceId).cancel(false);
283 scheduledTasks.remove(deviceId);
284 } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
285 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
286 updatePortStatistics(deviceId);
287 }
288 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700289 }
290
291 @Override
292 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200293 log.debug("Testing reachability for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100294
295 DeviceHandshaker handshaker = getHandshaker(deviceId);
296 if (handshaker == null) {
297 return false;
298 }
299
300 CompletableFuture<Boolean> reachable = handshaker.isReachable();
Andrea Campanella241896c2017-05-10 13:11:04 -0700301 try {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200302 return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
Andrea Campanella241896c2017-05-10 13:11:04 -0700303 } catch (InterruptedException | ExecutionException | TimeoutException e) {
304 log.error("Device {} is not reachable", deviceId, e);
305 return false;
306 }
307 }
308
309 @Override
310 public void changePortState(DeviceId deviceId, PortNumber portNumber,
311 boolean enable) {
312 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
313
314 PortAdmin portAdmin = getPortAdmin(deviceId);
315 CompletableFuture<Boolean> modified;
316 if (enable) {
317 modified = portAdmin.enable(portNumber);
318 } else {
319 modified = portAdmin.disable(portNumber);
320 }
321 modified.thenAcceptAsync(result -> {
322 if (!result) {
323 log.warn("Your device {} port {} status can't be changed to {}",
Andrea Campanella19090322017-08-22 10:31:37 +0200324 deviceId, portNumber, enable);
Andrea Campanella241896c2017-05-10 13:11:04 -0700325 }
326 });
327
328 } else {
329 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
330 }
331 }
332
333 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
334 Driver driver = getDriver(deviceId);
335 return getBehaviour(driver, DeviceHandshaker.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200336 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700337 }
338
339 private PortAdmin getPortAdmin(DeviceId deviceId) {
340 Driver driver = getDriver(deviceId);
341 return getBehaviour(driver, PortAdmin.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200342 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700343
344 }
345
346 private Driver getDriver(DeviceId deviceId) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100347 Driver driver = null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700348 try {
349 driver = driverService.getDriver(deviceId);
350 } catch (ItemNotFoundException e) {
351 log.debug("Falling back to configuration to fetch driver " +
Andrea Campanella19090322017-08-22 10:31:37 +0200352 "for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100353 BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
354 if (cfg != null) {
355 driver = driverService.getDriver(cfg.driver());
356 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700357 }
358 return driver;
359 }
360
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100361 //Distinguishing from getDriver to not impose everywhere the overhead to get the whole device.
362 // This is what the driverService does with the getDriver(deviceId) method.
363 // A redundant method here is needed because the driverService returns null when the device is not in the store
364 // as happens during disconnection.
365 // The whole device object is needed only in disconnection.
366 private Driver getDriverFromAnnotations(Device device) {
367 String driverName = device.annotations().value(DRIVER);
368 if (driverName != null) {
369 try {
370 return driverService.getDriver(driverName);
371 } catch (ItemNotFoundException e) {
372 log.warn("Specified driver {} not found, falling back.", driverName);
373 }
374 }
375 return null;
376 }
377
Andrea Campanella241896c2017-05-10 13:11:04 -0700378 //needed since the device manager will not return the driver through implementation()
379 // method since the device is not pushed to the core so for the connectDevice
380 // we need to work around that in order to test before calling
381 // store.createOrUpdateDevice
382 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
383 DriverData data) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100384 if (driver != null && driver.hasBehaviour(type)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700385 DefaultDriverHandler handler = new DefaultDriverHandler(data);
386 return driver.createBehaviour(handler, type);
387 } else {
388 return null;
389 }
390 }
391
392 //Connects a general device
393 private void connectDevice(DeviceId deviceId) {
394 //retrieve the configuration
395 GeneralProviderDeviceConfig providerConfig =
396 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
397 BasicDeviceConfig basicDeviceConfig =
398 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
399
400 if (providerConfig == null || basicDeviceConfig == null) {
401 log.error("Configuration is NULL: basic config {}, general provider " +
Andrea Campanella19090322017-08-22 10:31:37 +0200402 "config {}", basicDeviceConfig, providerConfig);
Andrea Campanella241896c2017-05-10 13:11:04 -0700403 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200404 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700405
Frank Wang554ce972017-09-06 09:56:43 +0800406 Driver driver;
407 try {
408 driver = driverService.getDriver(basicDeviceConfig.driver());
409 } catch (ItemNotFoundException e) {
410 log.warn("The driver of {} is not found : {}", deviceId, e.getMessage());
411 return;
412 }
413
Andrea Campanella241896c2017-05-10 13:11:04 -0700414 DriverData driverData = new DefaultDriverData(driver, deviceId);
Frank Wang554ce972017-09-06 09:56:43 +0800415 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200416 if (handshaker == null) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700417 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
Andrea Campanella19090322017-08-22 10:31:37 +0200418 "behaviour, {}", deviceId, driver.name(), driver.behaviours());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200419 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700420 }
Frank Wang554ce972017-09-06 09:56:43 +0800421
Andrea Campanella14e196d2017-07-24 18:11:36 +0200422 addConfigData(providerConfig, driverData);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200423
424 //Connecting to the device
425 CompletableFuture<Boolean> connected = handshaker.connect();
426
427 connected.thenAcceptAsync(result -> {
428 if (result) {
429
430 //Populated with the default values obtained by the driver
431 ChassisId cid = new ChassisId();
432 SparseAnnotations annotations = DefaultAnnotations.builder()
433 .set(AnnotationKeys.PROTOCOL,
Andrea Campanella19090322017-08-22 10:31:37 +0200434 providerConfig.protocolsInfo().keySet().toString())
Andrea Campanellabc112a92017-06-26 19:06:43 +0200435 .build();
436 DeviceDescription description =
437 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
Andrea Campanella19090322017-08-22 10:31:37 +0200438 driver.manufacturer(), driver.hwVersion(),
439 driver.swVersion(), UNKNOWN,
Yi Tseng92494fb2017-12-05 15:14:53 -0800440 cid, true, annotations);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200441 //Empty list of ports
442 List<PortDescription> ports = new ArrayList<>();
443
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100444 DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(driver,
445 DeviceDescriptionDiscovery.class, driverData);
446 if (deviceDiscovery != null) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200447 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
448 if (newdescription != null) {
449 description = newdescription;
450 }
451 ports = deviceDiscovery.discoverPortDetails();
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100452 } else {
453 log.info("No Device Description Discovery for device {}, no update for " +
454 "description or ports.", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200455 }
456
Andrea Campanella14e196d2017-07-24 18:11:36 +0200457 if (!handlePipeconf(deviceId, driver, driverData, true)) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400458 // Something went wrong during handling of pipeconf.
459 // We already logged the error.
460 handshaker.disconnect();
461 return;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200462 }
463
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400464 advertiseDevice(deviceId, description, ports);
465
Andrea Campanellabc112a92017-06-26 19:06:43 +0200466 } else {
467 log.warn("Can't connect to device {}", deviceId);
468 }
469 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700470 }
471 }
472
Andrea Campanella14e196d2017-07-24 18:11:36 +0200473 private void connectStandbyDevice(DeviceId deviceId) {
474
475 //if device is pipeline programmable we merge pipeconf + base driver for every other role
476 GeneralProviderDeviceConfig providerConfig =
477 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
478
479 Driver driver = getDriver(deviceId);
480
481
482 DriverData driverData = new DefaultDriverData(driver, deviceId);
483 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
484 if (handshaker == null) {
485 log.error("Device {}, with driver {} does not support DeviceHandshaker " +
486 "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
487 return;
488 }
489 addConfigData(providerConfig, driverData);
490
491 //Connecting to the device
492 handshaker.connect().thenAcceptAsync(result -> {
493 if (result) {
494 handlePipeconf(deviceId, driver, driverData, false);
495 }
496 });
497 }
498
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400499 /**
500 * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
501 * device can be registered to the core, false otherwise.
502 */
Andrea Campanella14e196d2017-07-24 18:11:36 +0200503 private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400504
505 PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200506 driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400507
508 if (pipelineProg == null) {
509 // Device is not pipeline programmable.
510 return true;
511 }
512
Andrea Campanella14e196d2017-07-24 18:11:36 +0200513 PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
514
515 if (pipeconf != null) {
516
517 PiPipeconfId pipeconfId = pipeconf.id();
518
519 try {
520 if (deployPipeconf) {
521 if (!pipelineProg.deployPipeconf(pipeconf).get()) {
522 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
523 pipeconfId, deviceId);
524 return false;
525 }
526 }
527 } catch (InterruptedException | ExecutionException e) {
528 log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
529 return false;
530 }
531 try {
532 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
533 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
534 driver.name(), deviceId, pipeconfId);
535 return false;
536 }
537 } catch (InterruptedException | ExecutionException e) {
538 log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
539 return false;
540 }
541 } else {
542 return false;
543 }
544
545 return true;
546 }
547
548 private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400549 PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
550 // No pipeconf has been associated with this device.
551 // Check if device driver provides a default one.
552 if (pipelineProg.getDefaultPipeconf().isPresent()) {
553 PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
554 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400555 return defaultPipeconf.id();
556 } else {
557 return null;
558 }
559 });
560
561 if (pipeconfId == null) {
562 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200563 return null;
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200564 }
565
566 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
567 log.warn("Pipeconf {} is not registered", pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200568 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400569 }
570
571
Andrea Campanella14e196d2017-07-24 18:11:36 +0200572 return piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400573 }
574
Andrea Campanellabc112a92017-06-26 19:06:43 +0200575 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
576 providerService.deviceConnected(deviceId, description);
577 providerService.updatePorts(deviceId, ports);
578 }
579
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100580 private void disconnectDevice(Device device) {
581 DeviceId deviceId = device.id();
Andrea Campanella241896c2017-05-10 13:11:04 -0700582 log.info("Disconnecting for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100583
584 //The driver service will return a null driver for the given deviceId
585 //since it's already removed form the device store, we leverage the device object from the DEVICE_REMOVED
586 //event to get the driver.
587 Driver driver = getDriverFromAnnotations(device);
588 if (driver != null) {
589 DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class,
590 new DefaultDriverData(driver, deviceId));
591 if (handshaker != null) {
592 CompletableFuture<Boolean> disconnect = handshaker.disconnect();
593 disconnect.thenAcceptAsync(result -> {
594 if (result) {
595 log.info("Disconnected device {}", deviceId);
596 providerService.deviceDisconnected(deviceId);
597 } else {
598 log.warn("Device {} was unable to disconnect", deviceId);
599 }
600 });
601 } else {
602 //gracefully ignoring.
603 log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
604 "shutdown of communication", deviceId);
605 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700606 } else {
607 //gracefully ignoring.
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100608 log.warn("Can't find driver for device {}, no guarantees of complete shutdown of communication", deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700609 }
Andrea Campanella19090322017-08-22 10:31:37 +0200610 ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
611 if (pollingStatisticsTask != null) {
612 pollingStatisticsTask.cancel(true);
613 }
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100614
Andrea Campanella241896c2017-05-10 13:11:04 -0700615 }
616
617 //Needed to catch the exception in the executors since are not rethrown otherwise.
618 private Runnable exceptionSafe(Runnable runnable) {
619 return () -> {
620 try {
621 runnable.run();
622 } catch (Exception e) {
623 log.error("Unhandled Exception", e);
624 }
625 };
626 }
627
628 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900629 Device device = deviceService.getDevice(deviceId);
630 if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
631 device.is(PortStatisticsDiscovery.class)) {
632 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
633 .discoverPortStatistics();
634 //updating statistcs only if not empty
635 if (!statistics.isEmpty()) {
636 providerService.updatePortStatistics(deviceId, statistics);
637 }
638 } else {
639 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200640 }
641 }
642
643 private boolean compareScheme(DeviceId deviceId) {
644 return deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700645 }
646
647 /**
648 * Listener for configuration events.
649 */
650 private class InternalNetworkConfigListener implements NetworkConfigListener {
651
652
653 @Override
654 public void event(NetworkConfigEvent event) {
655 DeviceId deviceId = (DeviceId) event.subject();
656 //Assuming that the deviceId comes with uri 'device:'
Andrea Campanella19090322017-08-22 10:31:37 +0200657 if (!compareScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700658 // not under my scheme, skipping
659 log.debug("{} is not my scheme, skipping", deviceId);
660 return;
661 }
Andrea Campanellace111932017-09-18 16:59:56 +0900662 if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700663 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200664 return;
665 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200666 NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
667 .leader().nodeId();
668 NodeId localNodeId = clusterService.getLocalNode().id();
669 if (localNodeId.equals(leaderNodeId)) {
670 if (processEvent(event, deviceId)) {
671 log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
672 deviceId);
673 checkAndSubmitDeviceTask(deviceId);
674 }
675 } else {
676 if (processEvent(event, deviceId)) {
677 log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
678 localNodeId, deviceId, leaderNodeId);
679 connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
680 //FIXME this will be removed when config is synced
681 cleanUpConfigInfo(deviceId);
682 }
683 }
684
685 }
686
687 private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200688 //FIXME to be removed when netcfg will issue device events in a bundle or
689 // ensure all configuration needed is present
690 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
691 lock.lock();
692 try {
693 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
694 //FIXME we currently assume that p4runtime devices are pipeline configurable.
695 //If we want to connect a p4runtime device with no pipeline
696 if (event.config().isPresent() &&
697 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
Andrea Campanella19090322017-08-22 10:31:37 +0200698 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200699 pipelineConfigured.add(deviceId);
700 }
701 deviceConfigured.add(deviceId);
702 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
703 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
704 //TODO add check for pipeline and add it to the pipeline list if no
705 // p4runtime is present.
706 driverConfigured.add(deviceId);
707 }
708 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
709 if (event.config().isPresent()
710 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
711 pipelineConfigured.add(deviceId);
712 }
713 }
714 //if the device has no "pipeline configurable protocol it will be present
715 // in the pipelineConfigured
716 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
717 && pipelineConfigured.contains(deviceId)) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200718 return true;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200719 } else {
720 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
721 log.debug("Waiting for pipeline configuration for device {}", deviceId);
722 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
723 log.debug("Waiting for device configuration for device {}", deviceId);
724 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
725 log.debug("Waiting for driver configuration for device {}", deviceId);
726 } else if (driverConfigured.contains(deviceId)) {
727 log.debug("Only driver configuration for device {}", deviceId);
728 } else if (deviceConfigured.contains(deviceId)) {
729 log.debug("Only device configuration for device {}", deviceId);
730 }
731 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200732 return false;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200733 } finally {
734 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700735 }
736 }
737
738 @Override
739 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200740 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
741 event.configClass().equals(BasicDeviceConfig.class) ||
742 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700743 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
744 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
745 }
746 }
747
Andrea Campanellabc112a92017-06-26 19:06:43 +0200748 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
749 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
750 //FIXME this will be removed when configuration is synced.
Andrea Campanella14e196d2017-07-24 18:11:36 +0200751 cleanUpConfigInfo(deviceId);
752
753 }
754
755 private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
756 //Storing deviceKeyId and all other config values
757 // as data in the driver with protocol_<info>
758 // name as the key. e.g protocol_ip
759 providerConfig.protocolsInfo()
760 .forEach((protocol, deviceInfoConfig) -> {
761 deviceInfoConfig.configValues()
762 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
763 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
764 });
765 }
766
767 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200768 deviceConfigured.remove(deviceId);
769 driverConfigured.remove(deviceId);
770 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200771 }
772
Andrea Campanella19090322017-08-22 10:31:37 +0200773 private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
774 int delay = 0;
775 if (randomize) {
Ray Milkeyb68bbbc2017-12-18 10:05:49 -0800776 delay = new SecureRandom().nextInt(10);
Andrea Campanella19090322017-08-22 10:31:37 +0200777 }
778 return portStatsExecutor.scheduleAtFixedRate(
779 exceptionSafe(() -> updatePortStatistics(deviceId)),
780 delay, pollFrequency, TimeUnit.SECONDS);
781 }
782
Andrea Campanella241896c2017-05-10 13:11:04 -0700783 /**
784 * Listener for core device events.
785 */
786 private class InternalDeviceListener implements DeviceListener {
787 @Override
788 public void event(DeviceEvent event) {
789 Type type = event.type();
Andrea Campanellace111932017-09-18 16:59:56 +0900790 DeviceId deviceId = event.subject().id();
Andrea Campanella241896c2017-05-10 13:11:04 -0700791 if (type.equals((Type.DEVICE_ADDED))) {
792
793 //For now this is scheduled periodically, when streaming API will
794 // be available we check and base it on the streaming API (e.g. gNMI)
Andrea Campanella14e196d2017-07-24 18:11:36 +0200795 if (mastershipService.isLocalMaster(deviceId)) {
796 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
797 updatePortStatistics(deviceId);
798 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700799
800 } else if (type.equals(Type.DEVICE_REMOVED)) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200801
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100802 //Passing the whole device object to get driver information
Andrea Campanella241896c2017-05-10 13:11:04 -0700803 connectionExecutor.submit(exceptionSafe(() ->
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100804 disconnectDevice(event.subject())));
Andrea Campanella241896c2017-05-10 13:11:04 -0700805 }
806 }
807
808 @Override
809 public boolean isRelevant(DeviceEvent event) {
Andrea Campanella19090322017-08-22 10:31:37 +0200810 return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700811 }
812 }
813}