blob: 48d8007fac71723a7cd8df6f8d86abb8299370c2 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020032import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020037import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070038import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.Device;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.MastershipRole;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.SparseAnnotations;
Carmelo Cascone87892e22017-11-13 16:01:29 -080045import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070046import org.onosproject.net.behaviour.PortAdmin;
47import org.onosproject.net.config.ConfigFactory;
48import org.onosproject.net.config.NetworkConfigEvent;
49import org.onosproject.net.config.NetworkConfigListener;
50import org.onosproject.net.config.NetworkConfigRegistry;
51import org.onosproject.net.config.basics.BasicDeviceConfig;
52import org.onosproject.net.config.basics.SubjectFactories;
53import org.onosproject.net.device.DefaultDeviceDescription;
54import org.onosproject.net.device.DeviceDescription;
55import org.onosproject.net.device.DeviceDescriptionDiscovery;
56import org.onosproject.net.device.DeviceEvent;
57import org.onosproject.net.device.DeviceHandshaker;
58import org.onosproject.net.device.DeviceListener;
59import org.onosproject.net.device.DeviceProvider;
60import org.onosproject.net.device.DeviceProviderRegistry;
61import org.onosproject.net.device.DeviceProviderService;
62import org.onosproject.net.device.DeviceService;
63import org.onosproject.net.device.PortDescription;
64import org.onosproject.net.device.PortStatistics;
65import org.onosproject.net.device.PortStatisticsDiscovery;
66import org.onosproject.net.driver.Behaviour;
67import org.onosproject.net.driver.DefaultDriverData;
68import org.onosproject.net.driver.DefaultDriverHandler;
69import org.onosproject.net.driver.Driver;
70import org.onosproject.net.driver.DriverData;
71import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040072import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020073import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080074import org.onosproject.net.pi.service.PiPipeconfConfig;
75import org.onosproject.net.pi.service.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070076import org.onosproject.net.provider.AbstractProvider;
77import org.onosproject.net.provider.ProviderId;
78import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020079import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070080import org.slf4j.Logger;
81
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080082import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070083import java.util.ArrayList;
84import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020085import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020086import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070087import java.util.List;
Thomas Vachuska5b38dc02018-05-10 15:24:40 -070088import java.util.Map;
Andrea Campanellace111932017-09-18 16:59:56 +090089import java.util.Objects;
Andrea Campanellabc112a92017-06-26 19:06:43 +020090import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070091import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020092import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020093import java.util.concurrent.ConcurrentMap;
94import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070095import java.util.concurrent.ExecutionException;
96import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020097import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -070098import java.util.concurrent.TimeUnit;
99import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200100import java.util.concurrent.locks.Lock;
101import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -0700102
103import static java.util.concurrent.Executors.newScheduledThreadPool;
104import static org.onlab.util.Tools.groupedThreads;
105import static org.onosproject.net.device.DeviceEvent.Type;
106import static org.slf4j.LoggerFactory.getLogger;
107
108/**
109 * Provider which uses drivers to detect device and do initial handshake
110 * and channel establishment with devices. Any other provider specific operation
111 * is also delegated to the DeviceHandshaker driver.
112 */
113@Beta
114@Component(immediate = true)
115public class GeneralDeviceProvider extends AbstractProvider
116 implements DeviceProvider {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200117 public static final String DRIVER = "driver";
Andrea Campanella14e196d2017-07-24 18:11:36 +0200118 public static final int REACHABILITY_TIMEOUT = 10;
119 public static final String DEPLOY = "deploy-";
120 public static final String PIPECONF_TOPIC = "-pipeconf";
121
Andrea Campanella241896c2017-05-10 13:11:04 -0700122 private final Logger log = getLogger(getClass());
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected DeviceProviderRegistry providerRegistry;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella4929a812017-10-09 18:38:23 +0200128 protected ComponentConfigService componentConfigService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella241896c2017-05-10 13:11:04 -0700131 protected NetworkConfigRegistry cfgService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected CoreService coreService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected DeviceService deviceService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected DriverService driverService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanella14e196d2017-07-24 18:11:36 +0200143 protected MastershipService mastershipService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200146 protected PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700147
Andrea Campanella14e196d2017-07-24 18:11:36 +0200148 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
149 protected ClusterService clusterService;
150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
152 protected LeadershipService leadershipService;
153
Andrea Campanella19090322017-08-22 10:31:37 +0200154 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
155 @Property(name = "pollFrequency", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
156 label = "Configure poll frequency for port status and statistics; " +
157 "default is 10 sec")
158 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
159
Andrea Campanella241896c2017-05-10 13:11:04 -0700160 protected static final String APP_NAME = "org.onosproject.generaldeviceprovider";
161 protected static final String URI_SCHEME = "device";
162 protected static final String CFG_SCHEME = "generalprovider";
163 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
164 private static final int CORE_POOL_SIZE = 10;
165 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200166
Andrea Campanellabc112a92017-06-26 19:06:43 +0200167 //FIXME this will be removed when the configuration is synced at the source.
168 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200169
Andrea Campanella19090322017-08-22 10:31:37 +0200170 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200171 //FIXME to be removed when netcfg will issue device events in a bundle or
172 //ensures all configuration needed is present
173 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
174 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
175 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700176
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700177 private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
178
Andrea Campanella241896c2017-05-10 13:11:04 -0700179
180 protected ScheduledExecutorService connectionExecutor
181 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200182 groupedThreads("onos/generaldeviceprovider-device",
183 "connection-executor-%d", log));
Andrea Campanella241896c2017-05-10 13:11:04 -0700184 protected ScheduledExecutorService portStatsExecutor
185 = newScheduledThreadPool(CORE_POOL_SIZE,
Andrea Campanella19090322017-08-22 10:31:37 +0200186 groupedThreads("onos/generaldeviceprovider-port-stats",
187 "port-stats-executor-%d", log));
188 protected ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700189
190 protected DeviceProviderService providerService;
191 private InternalDeviceListener deviceListener = new InternalDeviceListener();
192
193 protected final ConfigFactory factory =
194 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
195 SubjectFactories.DEVICE_SUBJECT_FACTORY,
196 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
197 @Override
198 public GeneralProviderDeviceConfig createConfig() {
199 return new GeneralProviderDeviceConfig();
200 }
201 };
202
203 protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
204
205
206 @Activate
207 public void activate() {
208 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200209 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700210 coreService.registerApplication(APP_NAME);
211 cfgService.registerConfigFactory(factory);
212 cfgService.addListener(cfgListener);
213 deviceService.addListener(deviceListener);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700214 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700215 //This will fail if ONOS has CFG and drivers which depend on this provider
216 // are activated, failing due to not finding the driver.
217 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
218 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
219 log.info("Started");
220 }
221
Andrea Campanella19090322017-08-22 10:31:37 +0200222 @Modified
223 public void modified(ComponentContext context) {
224 if (context != null) {
225 Dictionary<?, ?> properties = context.getProperties();
226 pollFrequency = Tools.getIntegerProperty(properties, "pollFrequency",
227 DEFAULT_POLL_FREQUENCY_SECONDS);
228 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
229 }
230
231 if (!scheduledTasks.isEmpty()) {
232 //cancel all previous tasks
233 scheduledTasks.values().forEach(task -> task.cancel(false));
234 //resubmit task with new timeout.
235 Set<DeviceId> deviceSubjects =
236 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
237 deviceSubjects.forEach(deviceId -> {
238 if (!compareScheme(deviceId)) {
239 // not under my scheme, skipping
240 log.debug("{} is not my scheme, skipping", deviceId);
241 return;
242 }
243 scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
244 });
245 }
Andrea Campanella19090322017-08-22 10:31:37 +0200246 }
247
Andrea Campanella241896c2017-05-10 13:11:04 -0700248 @Deactivate
249 public void deactivate() {
250 portStatsExecutor.shutdown();
Andrea Campanella4929a812017-10-09 18:38:23 +0200251 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700252 cfgService.removeListener(cfgListener);
253 //Not Removing the device so they can still be used from other driver providers
254 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
255 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
256 connectionExecutor.shutdown();
257 deviceService.removeListener(deviceListener);
258 providerRegistry.unregister(this);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700259 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700260 providerService = null;
261 cfgService.unregisterConfigFactory(factory);
262 log.info("Stopped");
263 }
264
265 public GeneralDeviceProvider() {
266 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
267 }
268
269
270 @Override
271 public void triggerProbe(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700272 // TODO Really don't see the point of this in non OF Context,
Andrea Campanella241896c2017-05-10 13:11:04 -0700273 // for now testing reachability, can be moved to no-op
274 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
275 isReachable(deviceId);
276 }
277
278 @Override
279 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200280 log.info("Received role {} for device {}", newRole, deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700281 CompletableFuture<MastershipRole> roleReply = getHandshaker(deviceId).roleChanged(newRole);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200282 roleReply.thenAcceptAsync(mastership -> {
283 providerService.receivedRoleReply(deviceId, newRole, mastership);
284 if (!mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) != null) {
285 scheduledTasks.get(deviceId).cancel(false);
286 scheduledTasks.remove(deviceId);
287 } else if (mastership.equals(MastershipRole.MASTER) && scheduledTasks.get(deviceId) == null) {
288 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
289 updatePortStatistics(deviceId);
290 }
291 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700292 }
293
294 @Override
295 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200296 log.debug("Testing reachability for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100297
298 DeviceHandshaker handshaker = getHandshaker(deviceId);
299 if (handshaker == null) {
300 return false;
301 }
302
303 CompletableFuture<Boolean> reachable = handshaker.isReachable();
Andrea Campanella241896c2017-05-10 13:11:04 -0700304 try {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200305 return reachable.get(REACHABILITY_TIMEOUT, TimeUnit.SECONDS);
Andrea Campanella241896c2017-05-10 13:11:04 -0700306 } catch (InterruptedException | ExecutionException | TimeoutException e) {
307 log.error("Device {} is not reachable", deviceId, e);
308 return false;
309 }
310 }
311
312 @Override
313 public void changePortState(DeviceId deviceId, PortNumber portNumber,
314 boolean enable) {
315 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
316
317 PortAdmin portAdmin = getPortAdmin(deviceId);
318 CompletableFuture<Boolean> modified;
319 if (enable) {
320 modified = portAdmin.enable(portNumber);
321 } else {
322 modified = portAdmin.disable(portNumber);
323 }
324 modified.thenAcceptAsync(result -> {
325 if (!result) {
326 log.warn("Your device {} port {} status can't be changed to {}",
Andrea Campanella19090322017-08-22 10:31:37 +0200327 deviceId, portNumber, enable);
Andrea Campanella241896c2017-05-10 13:11:04 -0700328 }
329 });
330
331 } else {
332 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
333 }
334 }
335
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700336 @Override
337 public void triggerDisconnect(DeviceId deviceId) {
338 connectionExecutor.execute(() -> disconnectDevice(deviceId));
339 }
340
Andrea Campanella241896c2017-05-10 13:11:04 -0700341 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700342 return handshakers.computeIfAbsent(deviceId, id -> {
343 Driver driver = getDriver(deviceId);
344 return driver == null ? null :
345 getBehaviour(driver, DeviceHandshaker.class,
346 new DefaultDriverData(driver, deviceId));
347 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700348 }
349
350 private PortAdmin getPortAdmin(DeviceId deviceId) {
351 Driver driver = getDriver(deviceId);
352 return getBehaviour(driver, PortAdmin.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200353 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700354
355 }
356
357 private Driver getDriver(DeviceId deviceId) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100358 Driver driver = null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700359 try {
360 driver = driverService.getDriver(deviceId);
361 } catch (ItemNotFoundException e) {
362 log.debug("Falling back to configuration to fetch driver " +
Andrea Campanella19090322017-08-22 10:31:37 +0200363 "for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100364 BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
365 if (cfg != null) {
366 driver = driverService.getDriver(cfg.driver());
367 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700368 }
369 return driver;
370 }
371
372 //needed since the device manager will not return the driver through implementation()
373 // method since the device is not pushed to the core so for the connectDevice
374 // we need to work around that in order to test before calling
375 // store.createOrUpdateDevice
376 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
377 DriverData data) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100378 if (driver != null && driver.hasBehaviour(type)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700379 DefaultDriverHandler handler = new DefaultDriverHandler(data);
380 return driver.createBehaviour(handler, type);
381 } else {
382 return null;
383 }
384 }
385
386 //Connects a general device
387 private void connectDevice(DeviceId deviceId) {
388 //retrieve the configuration
389 GeneralProviderDeviceConfig providerConfig =
390 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
391 BasicDeviceConfig basicDeviceConfig =
392 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
393
394 if (providerConfig == null || basicDeviceConfig == null) {
395 log.error("Configuration is NULL: basic config {}, general provider " +
Andrea Campanella19090322017-08-22 10:31:37 +0200396 "config {}", basicDeviceConfig, providerConfig);
Andrea Campanella241896c2017-05-10 13:11:04 -0700397 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200398 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700399
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700400 DeviceHandshaker handshaker = getHandshaker(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200401 if (handshaker == null) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700402 log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200403 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700404 }
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700405 Driver driver = handshaker.handler().driver();
Frank Wang554ce972017-09-06 09:56:43 +0800406
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700407 addConfigData(providerConfig, handshaker.data());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200408
409 //Connecting to the device
410 CompletableFuture<Boolean> connected = handshaker.connect();
411
412 connected.thenAcceptAsync(result -> {
413 if (result) {
414
415 //Populated with the default values obtained by the driver
416 ChassisId cid = new ChassisId();
417 SparseAnnotations annotations = DefaultAnnotations.builder()
418 .set(AnnotationKeys.PROTOCOL,
Andrea Campanella19090322017-08-22 10:31:37 +0200419 providerConfig.protocolsInfo().keySet().toString())
Andrea Campanellabc112a92017-06-26 19:06:43 +0200420 .build();
421 DeviceDescription description =
422 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
Andrea Campanella19090322017-08-22 10:31:37 +0200423 driver.manufacturer(), driver.hwVersion(),
424 driver.swVersion(), UNKNOWN,
Yi Tseng92494fb2017-12-05 15:14:53 -0800425 cid, true, annotations);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200426 //Empty list of ports
427 List<PortDescription> ports = new ArrayList<>();
428
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100429 DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(driver,
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700430 DeviceDescriptionDiscovery.class, handshaker.data());
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100431 if (deviceDiscovery != null) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200432 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
433 if (newdescription != null) {
434 description = newdescription;
435 }
436 ports = deviceDiscovery.discoverPortDetails();
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100437 } else {
438 log.info("No Device Description Discovery for device {}, no update for " +
439 "description or ports.", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200440 }
441
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700442 if (!handlePipeconf(deviceId, driver, handshaker.data(), true)) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400443 // Something went wrong during handling of pipeconf.
444 // We already logged the error.
445 handshaker.disconnect();
446 return;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200447 }
448
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400449 advertiseDevice(deviceId, description, ports);
450
Andrea Campanellabc112a92017-06-26 19:06:43 +0200451 } else {
452 log.warn("Can't connect to device {}", deviceId);
453 }
454 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700455 }
456 }
457
Andrea Campanella14e196d2017-07-24 18:11:36 +0200458 private void connectStandbyDevice(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700459 // if device is pipeline programmable we merge pipeconf + base driver for every other role
Andrea Campanella14e196d2017-07-24 18:11:36 +0200460 GeneralProviderDeviceConfig providerConfig =
461 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
462
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700463 DeviceHandshaker handshaker = getHandshaker(deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200464 if (handshaker == null) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700465 log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200466 return;
467 }
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700468 addConfigData(providerConfig, handshaker.data());
Andrea Campanella14e196d2017-07-24 18:11:36 +0200469
470 //Connecting to the device
471 handshaker.connect().thenAcceptAsync(result -> {
472 if (result) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700473 handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200474 }
475 });
476 }
477
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400478 /**
479 * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
480 * device can be registered to the core, false otherwise.
481 */
Andrea Campanella14e196d2017-07-24 18:11:36 +0200482 private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700483 PiPipelineProgrammable pipelineProg =
484 getBehaviour(driver, PiPipelineProgrammable.class, driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400485
486 if (pipelineProg == null) {
487 // Device is not pipeline programmable.
488 return true;
489 }
490
Andrea Campanella14e196d2017-07-24 18:11:36 +0200491 PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
492
493 if (pipeconf != null) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200494 PiPipeconfId pipeconfId = pipeconf.id();
495
496 try {
497 if (deployPipeconf) {
498 if (!pipelineProg.deployPipeconf(pipeconf).get()) {
499 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700500 pipeconfId, deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200501 return false;
502 }
503 }
504 } catch (InterruptedException | ExecutionException e) {
505 log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
506 return false;
507 }
508 try {
509 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
510 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700511 driver.name(), deviceId, pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200512 return false;
513 }
514 } catch (InterruptedException | ExecutionException e) {
515 log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
516 return false;
517 }
518 } else {
519 return false;
520 }
521
522 return true;
523 }
524
525 private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400526 PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
527 // No pipeconf has been associated with this device.
528 // Check if device driver provides a default one.
529 if (pipelineProg.getDefaultPipeconf().isPresent()) {
530 PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
531 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400532 return defaultPipeconf.id();
533 } else {
534 return null;
535 }
536 });
537
538 if (pipeconfId == null) {
539 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200540 return null;
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200541 }
542
543 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
544 log.warn("Pipeconf {} is not registered", pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200545 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400546 }
547
Andrea Campanella14e196d2017-07-24 18:11:36 +0200548 return piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400549 }
550
Andrea Campanellabc112a92017-06-26 19:06:43 +0200551 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
552 providerService.deviceConnected(deviceId, description);
553 providerService.updatePorts(deviceId, ports);
554 }
555
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700556 private void disconnectDevice(DeviceId deviceId) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700557 log.info("Disconnecting for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100558
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700559 DeviceHandshaker handshaker = handshakers.remove(deviceId);
560 if (handshaker != null) {
561 CompletableFuture<Boolean> disconnect = handshaker.disconnect();
562 disconnect.thenAcceptAsync(result -> {
563 if (result) {
564 log.info("Disconnected device {}", deviceId);
565 providerService.deviceDisconnected(deviceId);
566 } else {
567 log.warn("Device {} was unable to disconnect", deviceId);
568 }
569 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700570 } else {
571 //gracefully ignoring.
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700572 log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
573 "shutdown of communication", deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700574 }
Andrea Campanella19090322017-08-22 10:31:37 +0200575 ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
576 if (pollingStatisticsTask != null) {
577 pollingStatisticsTask.cancel(true);
578 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700579 }
580
581 //Needed to catch the exception in the executors since are not rethrown otherwise.
582 private Runnable exceptionSafe(Runnable runnable) {
583 return () -> {
584 try {
585 runnable.run();
586 } catch (Exception e) {
587 log.error("Unhandled Exception", e);
588 }
589 };
590 }
591
592 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900593 Device device = deviceService.getDevice(deviceId);
594 if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
595 device.is(PortStatisticsDiscovery.class)) {
596 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
597 .discoverPortStatistics();
598 //updating statistcs only if not empty
599 if (!statistics.isEmpty()) {
600 providerService.updatePortStatistics(deviceId, statistics);
601 }
602 } else {
603 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200604 }
605 }
606
607 private boolean compareScheme(DeviceId deviceId) {
608 return deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700609 }
610
611 /**
612 * Listener for configuration events.
613 */
614 private class InternalNetworkConfigListener implements NetworkConfigListener {
615
Andrea Campanella241896c2017-05-10 13:11:04 -0700616 @Override
617 public void event(NetworkConfigEvent event) {
618 DeviceId deviceId = (DeviceId) event.subject();
619 //Assuming that the deviceId comes with uri 'device:'
Andrea Campanella19090322017-08-22 10:31:37 +0200620 if (!compareScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700621 // not under my scheme, skipping
622 log.debug("{} is not my scheme, skipping", deviceId);
623 return;
624 }
Andrea Campanellace111932017-09-18 16:59:56 +0900625 if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700626 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200627 return;
628 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200629 NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
630 .leader().nodeId();
631 NodeId localNodeId = clusterService.getLocalNode().id();
632 if (localNodeId.equals(leaderNodeId)) {
633 if (processEvent(event, deviceId)) {
634 log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
635 deviceId);
636 checkAndSubmitDeviceTask(deviceId);
637 }
638 } else {
639 if (processEvent(event, deviceId)) {
640 log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
641 localNodeId, deviceId, leaderNodeId);
642 connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
643 //FIXME this will be removed when config is synced
644 cleanUpConfigInfo(deviceId);
645 }
646 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200647 }
648
649 private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200650 //FIXME to be removed when netcfg will issue device events in a bundle or
651 // ensure all configuration needed is present
652 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
653 lock.lock();
654 try {
655 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
656 //FIXME we currently assume that p4runtime devices are pipeline configurable.
657 //If we want to connect a p4runtime device with no pipeline
658 if (event.config().isPresent() &&
659 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
Andrea Campanella19090322017-08-22 10:31:37 +0200660 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200661 pipelineConfigured.add(deviceId);
662 }
663 deviceConfigured.add(deviceId);
664 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
665 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
666 //TODO add check for pipeline and add it to the pipeline list if no
667 // p4runtime is present.
668 driverConfigured.add(deviceId);
669 }
670 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
671 if (event.config().isPresent()
672 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
673 pipelineConfigured.add(deviceId);
674 }
675 }
676 //if the device has no "pipeline configurable protocol it will be present
677 // in the pipelineConfigured
678 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
679 && pipelineConfigured.contains(deviceId)) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200680 return true;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200681 } else {
682 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
683 log.debug("Waiting for pipeline configuration for device {}", deviceId);
684 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
685 log.debug("Waiting for device configuration for device {}", deviceId);
686 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
687 log.debug("Waiting for driver configuration for device {}", deviceId);
688 } else if (driverConfigured.contains(deviceId)) {
689 log.debug("Only driver configuration for device {}", deviceId);
690 } else if (deviceConfigured.contains(deviceId)) {
691 log.debug("Only device configuration for device {}", deviceId);
692 }
693 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200694 return false;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200695 } finally {
696 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700697 }
698 }
699
700 @Override
701 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200702 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
703 event.configClass().equals(BasicDeviceConfig.class) ||
704 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700705 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
706 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
707 }
708 }
709
Andrea Campanellabc112a92017-06-26 19:06:43 +0200710 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
711 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
712 //FIXME this will be removed when configuration is synced.
Andrea Campanella14e196d2017-07-24 18:11:36 +0200713 cleanUpConfigInfo(deviceId);
714
715 }
716
717 private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
718 //Storing deviceKeyId and all other config values
719 // as data in the driver with protocol_<info>
720 // name as the key. e.g protocol_ip
721 providerConfig.protocolsInfo()
722 .forEach((protocol, deviceInfoConfig) -> {
723 deviceInfoConfig.configValues()
724 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
725 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
726 });
727 }
728
729 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200730 deviceConfigured.remove(deviceId);
731 driverConfigured.remove(deviceId);
732 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200733 }
734
Andrea Campanella19090322017-08-22 10:31:37 +0200735 private ScheduledFuture<?> schedulePolling(DeviceId deviceId, boolean randomize) {
736 int delay = 0;
737 if (randomize) {
Ray Milkeyb68bbbc2017-12-18 10:05:49 -0800738 delay = new SecureRandom().nextInt(10);
Andrea Campanella19090322017-08-22 10:31:37 +0200739 }
740 return portStatsExecutor.scheduleAtFixedRate(
741 exceptionSafe(() -> updatePortStatistics(deviceId)),
742 delay, pollFrequency, TimeUnit.SECONDS);
743 }
744
Andrea Campanella241896c2017-05-10 13:11:04 -0700745 /**
746 * Listener for core device events.
747 */
748 private class InternalDeviceListener implements DeviceListener {
749 @Override
750 public void event(DeviceEvent event) {
Andrea Campanellace111932017-09-18 16:59:56 +0900751 DeviceId deviceId = event.subject().id();
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700752 // FIXME handling for mastership change scenario missing?
Andrea Campanella241896c2017-05-10 13:11:04 -0700753
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700754 // For now this is scheduled periodically, when streaming API will
755 // be available we check and base it on the streaming API (e.g. gNMI)
756 if (mastershipService.isLocalMaster(deviceId)) {
757 scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
Andrea Campanella241896c2017-05-10 13:11:04 -0700758 }
759 }
760
761 @Override
762 public boolean isRelevant(DeviceEvent event) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700763 return event.type() == Type.DEVICE_ADDED &&
764 event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700765 }
766 }
767}