blob: ad0d3886f2d42eb28c25f802a4d624b229e08f73 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020032import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020037import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070038import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.Device;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.MastershipRole;
43import org.onosproject.net.PortNumber;
Carmelo Cascone87892e22017-11-13 16:01:29 -080044import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070045import org.onosproject.net.behaviour.PortAdmin;
46import org.onosproject.net.config.ConfigFactory;
47import org.onosproject.net.config.NetworkConfigEvent;
48import org.onosproject.net.config.NetworkConfigListener;
49import org.onosproject.net.config.NetworkConfigRegistry;
50import org.onosproject.net.config.basics.BasicDeviceConfig;
51import org.onosproject.net.config.basics.SubjectFactories;
52import org.onosproject.net.device.DefaultDeviceDescription;
Carmelo Casconee5b28722018-06-22 17:28:28 +020053import org.onosproject.net.device.DeviceAgentEvent;
54import org.onosproject.net.device.DeviceAgentListener;
Andrea Campanella241896c2017-05-10 13:11:04 -070055import org.onosproject.net.device.DeviceDescription;
56import org.onosproject.net.device.DeviceDescriptionDiscovery;
57import org.onosproject.net.device.DeviceEvent;
58import org.onosproject.net.device.DeviceHandshaker;
59import org.onosproject.net.device.DeviceListener;
60import org.onosproject.net.device.DeviceProvider;
61import org.onosproject.net.device.DeviceProviderRegistry;
62import org.onosproject.net.device.DeviceProviderService;
63import org.onosproject.net.device.DeviceService;
64import org.onosproject.net.device.PortDescription;
65import org.onosproject.net.device.PortStatistics;
66import org.onosproject.net.device.PortStatisticsDiscovery;
67import org.onosproject.net.driver.Behaviour;
68import org.onosproject.net.driver.DefaultDriverData;
69import org.onosproject.net.driver.DefaultDriverHandler;
70import org.onosproject.net.driver.Driver;
71import org.onosproject.net.driver.DriverData;
72import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040073import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020074import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080075import org.onosproject.net.pi.service.PiPipeconfConfig;
76import org.onosproject.net.pi.service.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070077import org.onosproject.net.provider.AbstractProvider;
78import org.onosproject.net.provider.ProviderId;
79import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020080import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070081import org.slf4j.Logger;
82
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080083import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070084import java.util.ArrayList;
85import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020086import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020087import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070088import java.util.List;
Thomas Vachuska5b38dc02018-05-10 15:24:40 -070089import java.util.Map;
Andrea Campanellace111932017-09-18 16:59:56 +090090import java.util.Objects;
Andrea Campanellabc112a92017-06-26 19:06:43 +020091import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070092import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020093import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020094import java.util.concurrent.ConcurrentMap;
95import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070096import java.util.concurrent.ExecutionException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020097import java.util.concurrent.ExecutorService;
Andrea Campanella241896c2017-05-10 13:11:04 -070098import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +020099import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -0700100import java.util.concurrent.TimeUnit;
101import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200102import java.util.concurrent.locks.Lock;
103import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -0700104
Carmelo Casconee5b28722018-06-22 17:28:28 +0200105import static java.util.concurrent.Executors.newFixedThreadPool;
Andrea Campanella241896c2017-05-10 13:11:04 -0700106import static java.util.concurrent.Executors.newScheduledThreadPool;
107import static org.onlab.util.Tools.groupedThreads;
108import static org.onosproject.net.device.DeviceEvent.Type;
109import static org.slf4j.LoggerFactory.getLogger;
110
111/**
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200112 * Provider which uses drivers to detect device and do initial handshake and
113 * channel establishment with devices. Any other provider specific operation is
114 * also delegated to the DeviceHandshaker driver.
Andrea Campanella241896c2017-05-10 13:11:04 -0700115 */
116@Beta
117@Component(immediate = true)
118public class GeneralDeviceProvider extends AbstractProvider
119 implements DeviceProvider {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200120
121 // Timeout in seconds for operations on devices.
122 private static final int DEVICE_OP_TIMEOUT = 10;
123
124 private static final String DRIVER = "driver";
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200125 public static final String FIRST_CONNECTION_TOPIC = "first-connection-";
126 private static final String CHECK_CONNECTION_TOPIC = "check-connection-";
Andrea Campanella1e573442018-05-17 17:07:13 +0200127 private static final String POLL_FREQUENCY = "pollFrequency";
Andrea Campanella14e196d2017-07-24 18:11:36 +0200128
Andrea Campanella241896c2017-05-10 13:11:04 -0700129 private final Logger log = getLogger(getClass());
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200132 private DeviceProviderRegistry providerRegistry;
Andrea Campanella241896c2017-05-10 13:11:04 -0700133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200135 private ComponentConfigService componentConfigService;
Andrea Campanella4929a812017-10-09 18:38:23 +0200136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200138 private NetworkConfigRegistry cfgService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200141 private CoreService coreService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200144 private DeviceService deviceService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200147 private DriverService driverService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200150 private MastershipService mastershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 private PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700154
Andrea Campanella14e196d2017-07-24 18:11:36 +0200155 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200156 private ClusterService clusterService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200157
158 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200159 private LeadershipService leadershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200160
Andrea Campanella19090322017-08-22 10:31:37 +0200161 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
Andrea Campanella1e573442018-05-17 17:07:13 +0200162 @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
Andrea Campanella19090322017-08-22 10:31:37 +0200163 label = "Configure poll frequency for port status and statistics; " +
164 "default is 10 sec")
165 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
166
Andrea Campanella1e573442018-05-17 17:07:13 +0200167 private static final int DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS = 10;
168 @Property(name = "deviceAvailabilityPollFrequency", intValue = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS,
169 label = "Configure poll frequency for checking device availability; " +
170 "default is 10 sec")
171 private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
172
Carmelo Casconee5b28722018-06-22 17:28:28 +0200173 private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
174 private static final String URI_SCHEME = "device";
175 private static final String CFG_SCHEME = "generalprovider";
Andrea Campanella241896c2017-05-10 13:11:04 -0700176 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
177 private static final int CORE_POOL_SIZE = 10;
178 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200179
Andrea Campanellabc112a92017-06-26 19:06:43 +0200180 //FIXME this will be removed when the configuration is synced at the source.
181 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200182
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200183 private static final ConcurrentMap<DeviceId, Lock> DEVICE_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200184 //FIXME to be removed when netcfg will issue device events in a bundle or
185 //ensures all configuration needed is present
186 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
187 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
188 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700189
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700190 private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
191
Carmelo Casconee5b28722018-06-22 17:28:28 +0200192 private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
Andrea Campanella241896c2017-05-10 13:11:04 -0700193
Andrea Campanella241896c2017-05-10 13:11:04 -0700194
Carmelo Casconee5b28722018-06-22 17:28:28 +0200195 private ExecutorService connectionExecutor
196 = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200197 "onos/generaldeviceprovider-device-connect", "%d", log));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200198 private ScheduledExecutorService portStatsExecutor
199 = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200200 "onos/generaldeviceprovider-port-stats", "%d", log));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200201 private ScheduledExecutorService availabilityCheckExecutor
202 = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200203 "onos/generaldeviceprovider-availability-check", "%d", log));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200204 private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
205
206 private DeviceProviderService providerService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700207 private InternalDeviceListener deviceListener = new InternalDeviceListener();
208
Carmelo Casconee5b28722018-06-22 17:28:28 +0200209 private final ConfigFactory factory =
Andrea Campanella241896c2017-05-10 13:11:04 -0700210 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
211 SubjectFactories.DEVICE_SUBJECT_FACTORY,
212 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
213 @Override
214 public GeneralProviderDeviceConfig createConfig() {
215 return new GeneralProviderDeviceConfig();
216 }
217 };
218
Carmelo Casconee5b28722018-06-22 17:28:28 +0200219 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
220 private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
Andrea Campanella241896c2017-05-10 13:11:04 -0700221
222
223 @Activate
Andrea Campanella1e573442018-05-17 17:07:13 +0200224 public void activate(ComponentContext context) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700225 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200226 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700227 coreService.registerApplication(APP_NAME);
228 cfgService.registerConfigFactory(factory);
229 cfgService.addListener(cfgListener);
230 deviceService.addListener(deviceListener);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700231 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700232 //This will fail if ONOS has CFG and drivers which depend on this provider
233 // are activated, failing due to not finding the driver.
234 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200235 .forEach(did -> triggerConnectWithLeadership(
236 did, FIRST_CONNECTION_TOPIC + did.toString()));
Andrea Campanella1e573442018-05-17 17:07:13 +0200237 //Initiating a periodic check to see if any device is available again and reconnect it.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200238 availabilityCheckExecutor.scheduleAtFixedRate(
239 this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
Andrea Campanella1e573442018-05-17 17:07:13 +0200240 deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
241 modified(context);
Andrea Campanella241896c2017-05-10 13:11:04 -0700242 log.info("Started");
243 }
244
Andrea Campanella19090322017-08-22 10:31:37 +0200245 @Modified
246 public void modified(ComponentContext context) {
247 if (context != null) {
248 Dictionary<?, ?> properties = context.getProperties();
Andrea Campanella1e573442018-05-17 17:07:13 +0200249 pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200250 DEFAULT_POLL_FREQUENCY_SECONDS);
Andrea Campanella19090322017-08-22 10:31:37 +0200251 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
252 }
253
254 if (!scheduledTasks.isEmpty()) {
255 //cancel all previous tasks
256 scheduledTasks.values().forEach(task -> task.cancel(false));
257 //resubmit task with new timeout.
258 Set<DeviceId> deviceSubjects =
259 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
260 deviceSubjects.forEach(deviceId -> {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200261 if (notMyScheme(deviceId)) {
Andrea Campanella19090322017-08-22 10:31:37 +0200262 // not under my scheme, skipping
263 log.debug("{} is not my scheme, skipping", deviceId);
264 return;
265 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200266 scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
Andrea Campanella19090322017-08-22 10:31:37 +0200267 });
268 }
Andrea Campanella19090322017-08-22 10:31:37 +0200269 }
270
Andrea Campanella241896c2017-05-10 13:11:04 -0700271 @Deactivate
272 public void deactivate() {
273 portStatsExecutor.shutdown();
Andrea Campanella1e573442018-05-17 17:07:13 +0200274 availabilityCheckExecutor.shutdown();
Andrea Campanella4929a812017-10-09 18:38:23 +0200275 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700276 cfgService.removeListener(cfgListener);
277 //Not Removing the device so they can still be used from other driver providers
278 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
279 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
280 connectionExecutor.shutdown();
281 deviceService.removeListener(deviceListener);
282 providerRegistry.unregister(this);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700283 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700284 providerService = null;
285 cfgService.unregisterConfigFactory(factory);
286 log.info("Stopped");
287 }
288
289 public GeneralDeviceProvider() {
290 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
291 }
292
293
294 @Override
295 public void triggerProbe(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700296 // TODO Really don't see the point of this in non OF Context,
Andrea Campanella241896c2017-05-10 13:11:04 -0700297 // for now testing reachability, can be moved to no-op
298 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
299 isReachable(deviceId);
300 }
301
302 @Override
303 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200304 log.info("Received role {} for device {}", newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200305 requestedRoles.put(deviceId, newRole);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200306 connectionExecutor.submit(exceptionSafe(
307 () -> doRoleChanged(deviceId, newRole)));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200308 }
309
310 private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200311 final DeviceHandshaker handshaker = getHandshaker(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200312 if (handshaker == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200313 log.error("Null handshaker. Unable to notify new role {} to {}",
314 newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200315 return;
316 }
317 handshaker.roleChanged(newRole);
Andrea Campanella241896c2017-05-10 13:11:04 -0700318 }
319
320 @Override
321 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200322 log.debug("Testing reachability for device {}", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200323 final DeviceHandshaker handshaker = getHandshaker(deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100324 if (handshaker == null) {
325 return false;
326 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200327 return getFutureWithDeadline(
328 handshaker.isReachable(), "checking reachability",
329 deviceId, false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700330 }
331
332 @Override
333 public void changePortState(DeviceId deviceId, PortNumber portNumber,
334 boolean enable) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200335 if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
336 log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
337 deviceId);
338 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700339 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200340 final PortAdmin portAdmin = getPortAdmin(deviceId);
341 final CompletableFuture<Boolean> modified = enable
342 ? portAdmin.enable(portNumber)
343 : portAdmin.disable(portNumber);
344 modified.thenAcceptAsync(result -> {
345 if (!result) {
346 log.warn("Port {} status cannot be changed on {} (enable={})",
347 portNumber, deviceId, enable);
348 }
349 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700350 }
351
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700352 @Override
353 public void triggerDisconnect(DeviceId deviceId) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200354 log.debug("Triggering disconnection of device {}", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200355 connectionExecutor.execute(
356 () -> disconnectDevice(deviceId)
357 .thenRunAsync(() -> checkAndConnect(deviceId)));
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700358 }
359
Andrea Campanella241896c2017-05-10 13:11:04 -0700360 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700361 return handshakers.computeIfAbsent(deviceId, id -> {
362 Driver driver = getDriver(deviceId);
363 return driver == null ? null :
364 getBehaviour(driver, DeviceHandshaker.class,
365 new DefaultDriverData(driver, deviceId));
366 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700367 }
368
369 private PortAdmin getPortAdmin(DeviceId deviceId) {
370 Driver driver = getDriver(deviceId);
371 return getBehaviour(driver, PortAdmin.class,
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200372 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700373
374 }
375
376 private Driver getDriver(DeviceId deviceId) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100377 Driver driver = null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700378 try {
379 driver = driverService.getDriver(deviceId);
380 } catch (ItemNotFoundException e) {
381 log.debug("Falling back to configuration to fetch driver " +
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200382 "for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100383 BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
384 if (cfg != null) {
385 driver = driverService.getDriver(cfg.driver());
386 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700387 }
388 return driver;
389 }
390
391 //needed since the device manager will not return the driver through implementation()
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200392 // method since the device is not pushed to the core so for the connectDeviceAsMaster
Andrea Campanella241896c2017-05-10 13:11:04 -0700393 // we need to work around that in order to test before calling
394 // store.createOrUpdateDevice
395 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
396 DriverData data) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100397 if (driver != null && driver.hasBehaviour(type)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700398 DefaultDriverHandler handler = new DefaultDriverHandler(data);
399 return driver.createBehaviour(handler, type);
400 } else {
401 return null;
402 }
403 }
404
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200405 private void doConnectDevice(DeviceId deviceId, boolean asMaster) {
406 if (deviceService.getDevice(deviceId) != null
407 && deviceService.isAvailable(deviceId)) {
408 log.info("Device {} is already connected to ONOS and is available",
409 deviceId);
410 return;
411 }
412 // Retrieve config
413 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
414 deviceId, GeneralProviderDeviceConfig.class);
415 final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
416 deviceId, BasicDeviceConfig.class);
Andrea Campanella241896c2017-05-10 13:11:04 -0700417 if (providerConfig == null || basicDeviceConfig == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200418 log.error("Configuration missing, cannot connect to {}. " +
419 "basicDeviceConfig={}, generalProvider={}",
420 deviceId, basicDeviceConfig, providerConfig);
421 return;
422 }
423 log.info("Initiating connection to device {} with driver {} ... asMaster={}",
424 deviceId, basicDeviceConfig.driver(), asMaster);
425 // Get handshaker, driver and driver data.
426 final DeviceHandshaker handshaker = getHandshaker(deviceId);
427 if (handshaker == null) {
428 log.error("Missing DeviceHandshaker behavior for {}, aborting connection",
429 deviceId);
430 return;
431 }
432 final Driver driver = handshaker.handler().driver();
433 // Enhance driver data with info in GDP config.
434 augmentConfigData(providerConfig, handshaker.data());
435 final DriverData driverData = handshaker.data();
436 // Start connection via handshaker.
437 final Boolean connectSuccess = getFutureWithDeadline(
438 handshaker.connect(), "initiating connection",
439 deviceId, null);
440 if (connectSuccess == null) {
441 // Error logged by getFutureWithDeadline().
442 return;
443 } else if (!connectSuccess) {
444 log.warn("Unable to connect to {}", deviceId);
445 return;
446 }
447 // Handle pipeconf (if device is capable)
448 if (!handlePipeconf(deviceId, driver, driverData, asMaster)) {
449 // We already logged the error.
450 handshaker.disconnect();
451 return;
452 }
453 // Add device agent listener.
454 handshaker.addDeviceAgentListener(deviceAgentListener);
455 // All good. Notify core (if master).
456 if (asMaster) {
457 advertiseDevice(deviceId, driver, providerConfig, driverData);
Andrea Campanella241896c2017-05-10 13:11:04 -0700458 }
459 }
460
Andrea Campanella14e196d2017-07-24 18:11:36 +0200461
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200462 private void advertiseDevice(DeviceId deviceId, Driver driver,
463 GeneralProviderDeviceConfig providerConfig,
464 DriverData driverData) {
465 // Obtain device and port description and advertise device to core.
466 DeviceDescription description = null;
467 final List<PortDescription> ports;
468
469 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
470 driver, DeviceDescriptionDiscovery.class, driverData);
471
472 if (deviceDiscovery != null) {
473 description = deviceDiscovery.discoverDeviceDetails();
474 ports = deviceDiscovery.discoverPortDetails();
475 } else {
476 log.warn("Missing DeviceDescriptionDiscovery behavior for {}, " +
477 "no update for description or ports.", deviceId);
478 ports = new ArrayList<>();
Andrea Campanella14e196d2017-07-24 18:11:36 +0200479 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200480
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200481 if (description == null) {
482 // Generate one here.
483 // FIXME: a behavior impl should not return a null description
484 // (e.g. as GnmiDeviceDescriptionDiscovery). This case should apply
485 // only if a the behavior is not available.
486 description = new DefaultDeviceDescription(
487 deviceId.uri(), Device.Type.SWITCH,
488 driver.manufacturer(), driver.hwVersion(),
489 driver.swVersion(), UNKNOWN,
490 new ChassisId(), true,
491 DefaultAnnotations.builder()
492 .set(AnnotationKeys.PROTOCOL,
493 providerConfig.protocolsInfo().keySet().toString())
494 .build());
495 }
496
497 providerService.deviceConnected(deviceId, description);
498 providerService.updatePorts(deviceId, ports);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200499 }
500
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400501 /**
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200502 * Handles the case of a device that is pipeline programmable. Returns true
503 * if the operation wa successful and the device can be registered to the
504 * core, false otherwise.
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400505 */
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200506 private boolean handlePipeconf(DeviceId deviceId, Driver driver,
507 DriverData driverData, boolean deployPipeconf) {
508 final PiPipelineProgrammable pipelineProg = getBehaviour(
509 driver, PiPipelineProgrammable.class, driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400510 if (pipelineProg == null) {
511 // Device is not pipeline programmable.
512 return true;
513 }
514
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200515 final PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
516 if (pipeconf == null) {
517 return false;
518 }
519 final PiPipeconfId pipeconfId = pipeconf.id();
Andrea Campanella14e196d2017-07-24 18:11:36 +0200520
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200521 if (deployPipeconf) {
522 final Boolean deploySuccess = getFutureWithDeadline(
523 pipelineProg.deployPipeconf(pipeconf),
524 "deploying pipeconf", deviceId, null);
525 if (deploySuccess == null) {
526 // Error logged by getFutureWithDeadline().
527 return false;
528 } else if (!deploySuccess) {
529 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
530 pipeconfId, deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200531 return false;
532 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200533 }
534
535 final Boolean mergeSuccess = getFutureWithDeadline(
536 piPipeconfService.bindToDevice(pipeconfId, deviceId),
537 "merging driver", deviceId, null);
538 if (mergeSuccess == null) {
539 // Error logged by getFutureWithDeadline().
540 return false;
541 } else if (!mergeSuccess) {
542 log.error("Unable to merge pipeconf driver for {}, aborting device discovery",
543 pipeconfId, deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200544 return false;
545 }
546
547 return true;
548 }
549
550 private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
Carmelo Cascone92044522018-06-29 19:00:59 +0200551 PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
552 if (pipeconfId == null || pipeconfId.id().isEmpty()) {
553 // No pipeconf has been provided in the cfg.
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400554 // Check if device driver provides a default one.
555 if (pipelineProg.getDefaultPipeconf().isPresent()) {
Carmelo Cascone92044522018-06-29 19:00:59 +0200556 final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400557 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone92044522018-06-29 19:00:59 +0200558 pipeconfId = defaultPipeconf.id();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400559 } else {
Carmelo Cascone92044522018-06-29 19:00:59 +0200560 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it", deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400561 return null;
562 }
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200563 }
Carmelo Cascone92044522018-06-29 19:00:59 +0200564 // Check if registered
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200565 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
566 log.warn("Pipeconf {} is not registered", pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200567 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400568 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200569 return piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400570 }
571
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200572 private CompletableFuture<?> disconnectDevice(DeviceId deviceId) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700573 log.info("Disconnecting for device {}", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200574 // Remove from core (if master)
575 if (mastershipService.isLocalMaster(deviceId)) {
576 providerService.deviceDisconnected(deviceId);
577 }
578 // Cancel tasks
Carmelo Casconee5b28722018-06-22 17:28:28 +0200579 if (scheduledTasks.containsKey(deviceId)) {
580 scheduledTasks.remove(deviceId).cancel(true);
581 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200582 // Perform disconnection with device.
583 final DeviceHandshaker handshaker = handshakers.remove(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200584 if (handshaker == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200585 // Gracefully ignore
586 log.warn("Missing DeviceHandshaker behavior for {}, " +
587 "no guarantees of complete disconnection",
588 deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200589 return CompletableFuture.completedFuture(false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700590 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200591 handshaker.removeDeviceAgentListener(deviceAgentListener);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200592 return handshaker.disconnect()
593 .thenApplyAsync(result -> {
594 if (result) {
595 log.info("Disconnected device {}", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200596 } else {
597 log.warn("Device {} was unable to disconnect", deviceId);
598 }
599 return result;
600 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700601 }
602
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200603 // Needed to catch the exception in the executors since are not rethrown otherwise.
Andrea Campanella241896c2017-05-10 13:11:04 -0700604 private Runnable exceptionSafe(Runnable runnable) {
605 return () -> {
606 try {
607 runnable.run();
608 } catch (Exception e) {
609 log.error("Unhandled Exception", e);
610 }
611 };
612 }
613
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200614 private Runnable withDeviceLock(Runnable runnable, DeviceId deviceId) {
615 return () -> {
616 Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
617 lock.lock();
618 try {
619 runnable.run();
620 } finally {
621 lock.unlock();
622 }
623 };
624 }
625
Andrea Campanella241896c2017-05-10 13:11:04 -0700626 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900627 Device device = deviceService.getDevice(deviceId);
628 if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
629 device.is(PortStatisticsDiscovery.class)) {
630 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
631 .discoverPortStatistics();
632 //updating statistcs only if not empty
633 if (!statistics.isEmpty()) {
634 providerService.updatePortStatistics(deviceId, statistics);
635 }
636 } else {
637 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200638 }
639 }
640
Carmelo Casconee5b28722018-06-22 17:28:28 +0200641 private boolean notMyScheme(DeviceId deviceId) {
642 return !deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700643 }
644
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200645 private void triggerConnectWithLeadership(DeviceId deviceId,
646 String leadershipTopic) {
647 final NodeId leaderNodeId = leadershipService.runForLeadership(
648 leadershipTopic).leader().nodeId();
649 final boolean thisNodeMaster = clusterService
650 .getLocalNode().id().equals(leaderNodeId);
651 connectionExecutor.submit(withDeviceLock(exceptionSafe(
652 () -> doConnectDevice(deviceId, thisNodeMaster)), deviceId));
653 }
654
Andrea Campanella241896c2017-05-10 13:11:04 -0700655 /**
656 * Listener for configuration events.
657 */
658 private class InternalNetworkConfigListener implements NetworkConfigListener {
659
Andrea Campanella241896c2017-05-10 13:11:04 -0700660 @Override
661 public void event(NetworkConfigEvent event) {
662 DeviceId deviceId = (DeviceId) event.subject();
663 //Assuming that the deviceId comes with uri 'device:'
Carmelo Casconee5b28722018-06-22 17:28:28 +0200664 if (notMyScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700665 // not under my scheme, skipping
666 log.debug("{} is not my scheme, skipping", deviceId);
667 return;
668 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200669 if (!isDeviceConfigComplete(event, deviceId)) {
670 // Still waiting for some configuration.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200671 return;
672 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200673 // Good to go.
674 triggerConnectWithLeadership(
675 deviceId, FIRST_CONNECTION_TOPIC + deviceId.toString());
676 cleanUpConfigInfo(deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200677 }
678
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200679 private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
680 // FIXME to be removed when netcfg will issue device events in a bundle or
Andrea Campanellabc112a92017-06-26 19:06:43 +0200681 // ensure all configuration needed is present
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200682 Lock lock = DEVICE_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200683 lock.lock();
684 try {
685 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
686 //FIXME we currently assume that p4runtime devices are pipeline configurable.
687 //If we want to connect a p4runtime device with no pipeline
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200688 if (event.config().isPresent()) {
689 deviceConfigured.add(deviceId);
690 final boolean isNotPipelineConfigurable = Collections.disjoint(
691 ImmutableSet.copyOf(event.config().get().node().fieldNames()),
692 PIPELINE_CONFIGURABLE_PROTOCOLS);
693 if (isNotPipelineConfigurable) {
694 // Skip waiting for a pipeline if we can't support it.
695 pipelineConfigured.add(deviceId);
696 }
Andrea Campanellabc112a92017-06-26 19:06:43 +0200697 }
Andrea Campanellabc112a92017-06-26 19:06:43 +0200698 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
699 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200700 driverConfigured.add(deviceId);
701 }
702 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
703 if (event.config().isPresent()
704 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
705 pipelineConfigured.add(deviceId);
706 }
707 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200708
709 if (deviceConfigured.contains(deviceId)
710 && driverConfigured.contains(deviceId)
Andrea Campanellabc112a92017-06-26 19:06:43 +0200711 && pipelineConfigured.contains(deviceId)) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200712 return true;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200713 } else {
714 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
715 log.debug("Waiting for pipeline configuration for device {}", deviceId);
716 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
717 log.debug("Waiting for device configuration for device {}", deviceId);
718 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
719 log.debug("Waiting for driver configuration for device {}", deviceId);
720 } else if (driverConfigured.contains(deviceId)) {
721 log.debug("Only driver configuration for device {}", deviceId);
722 } else if (deviceConfigured.contains(deviceId)) {
723 log.debug("Only device configuration for device {}", deviceId);
724 }
725 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200726 return false;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200727 } finally {
728 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700729 }
730 }
731
732 @Override
733 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200734 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
735 event.configClass().equals(BasicDeviceConfig.class) ||
736 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700737 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
738 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
739 }
740 }
741
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200742 private void augmentConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200743 //Storing deviceKeyId and all other config values
744 // as data in the driver with protocol_<info>
745 // name as the key. e.g protocol_ip
746 providerConfig.protocolsInfo()
747 .forEach((protocol, deviceInfoConfig) -> {
748 deviceInfoConfig.configValues()
749 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
750 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
751 });
752 }
753
754 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200755 deviceConfigured.remove(deviceId);
756 driverConfigured.remove(deviceId);
757 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200758 }
759
Carmelo Casconee5b28722018-06-22 17:28:28 +0200760 private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
Andrea Campanella19090322017-08-22 10:31:37 +0200761 int delay = 0;
762 if (randomize) {
Ray Milkeyb68bbbc2017-12-18 10:05:49 -0800763 delay = new SecureRandom().nextInt(10);
Andrea Campanella19090322017-08-22 10:31:37 +0200764 }
765 return portStatsExecutor.scheduleAtFixedRate(
766 exceptionSafe(() -> updatePortStatistics(deviceId)),
767 delay, pollFrequency, TimeUnit.SECONDS);
768 }
769
Andrea Campanella1e573442018-05-17 17:07:13 +0200770 private void scheduleDevicePolling() {
771 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class).forEach(this::checkAndConnect);
772 }
773
Carmelo Cascone92044522018-06-29 19:00:59 +0200774 private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
775 PiPipeconfConfig config = cfgService.getConfig(
776 deviceId, PiPipeconfConfig.class);
777 if (config == null) {
778 return null;
779 }
780 return config.piPipeconfId();
781 }
782
Andrea Campanella1e573442018-05-17 17:07:13 +0200783 private void checkAndConnect(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200784 // Let's try and reconnect to a device which is stored in cfg.
Andrea Campanella1e573442018-05-17 17:07:13 +0200785 // One of the following conditions must be satisfied:
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200786 // 1) device is null in the store meaning that is was never connected or
787 // it was administratively removed
788 // 2) the device is not available and there is no MASTER instance,
789 // meaning the device lost it's connection to ONOS at some point in the
790 // past.
791 // We also check that the general device provider config and the driver
792 // config are present. We do not check for reachability using
793 // isReachable(deviceId) since the behaviour of this method can vary
794 // depending on protocol nuances. We leave this check to the device
795 // handshaker at later stages of the connection process. IF the
796 // conditions are not met but instead the device is present in the
797 // store, available and this instance is MASTER but is not reachable we
798 // remove it from the store.
Andrea Campanella1e573442018-05-17 17:07:13 +0200799
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200800 if ((deviceService.getDevice(deviceId) == null
801 || (!deviceService.isAvailable(deviceId)
802 && mastershipService.getMasterFor(deviceId) == null))
803 && configIsPresent(deviceId)) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200804 log.debug("Trying to re-connect to device {}", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200805 triggerConnectWithLeadership(
806 deviceId, CHECK_CONNECTION_TOPIC + deviceId.toString());
807 cleanUpConfigInfo(deviceId);
808 } else if (deviceService.getDevice(deviceId) != null
809 && deviceService.isAvailable(deviceId)
810 && mastershipService.isLocalMaster(deviceId)
811 && !isReachable(deviceId)
812 && configIsPresent(deviceId)) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200813 log.info("Removing available but unreachable device {}", deviceId);
814 disconnectDevice(deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200815 }
816 }
817
818 private boolean configIsPresent(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200819 final boolean present =
820 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
821 && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
Andrea Campanella1e573442018-05-17 17:07:13 +0200822 if (!present) {
823 log.warn("Configuration for device {} is not complete", deviceId);
824 }
825 return present;
826 }
827
Carmelo Casconee5b28722018-06-22 17:28:28 +0200828 private void handleChannelClosed(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200829 log.info("Disconnecting device {}, due to channel closed event",
830 deviceId);
831 disconnectDevice(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200832 }
833
834 private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
835 //Notify core about response.
836 if (!requestedRoles.containsKey(deviceId)) {
837 return;
838 }
839 providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
840 // If not master, cancel polling tasks, otherwise start them.
841 if (!response.equals(MastershipRole.MASTER)
842 && scheduledTasks.get(deviceId) != null) {
843 scheduledTasks.remove(deviceId).cancel(false);
844 } else if (response.equals(MastershipRole.MASTER)
845 && scheduledTasks.get(deviceId) == null) {
846 scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
847 updatePortStatistics(deviceId);
848 }
849 }
850
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200851 private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
852 DeviceId deviceId, U defaultValue) {
853 try {
854 return future.get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
855 } catch (InterruptedException e) {
856 log.error("Thread interrupted while {} on {}", opDescription, deviceId);
857 Thread.currentThread().interrupt();
858 } catch (ExecutionException e) {
859 log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
860 } catch (TimeoutException e) {
861 log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
862 }
863 return defaultValue;
864 }
865
Andrea Campanella241896c2017-05-10 13:11:04 -0700866 /**
867 * Listener for core device events.
868 */
869 private class InternalDeviceListener implements DeviceListener {
870 @Override
871 public void event(DeviceEvent event) {
Andrea Campanellace111932017-09-18 16:59:56 +0900872 DeviceId deviceId = event.subject().id();
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700873 // For now this is scheduled periodically, when streaming API will
874 // be available we check and base it on the streaming API (e.g. gNMI)
875 if (mastershipService.isLocalMaster(deviceId)) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200876 scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
Andrea Campanella241896c2017-05-10 13:11:04 -0700877 }
878 }
879
880 @Override
881 public boolean isRelevant(DeviceEvent event) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700882 return event.type() == Type.DEVICE_ADDED &&
883 event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700884 }
885 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200886
887 /**
Carmelo Casconee5b28722018-06-22 17:28:28 +0200888 * Listener for device agent events.
Andrea Campanella1e573442018-05-17 17:07:13 +0200889 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200890 private class InternalDeviceAgentListener implements DeviceAgentListener {
Andrea Campanella1e573442018-05-17 17:07:13 +0200891
892 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200893 public void event(DeviceAgentEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200894 DeviceId deviceId = event.subject();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200895 switch (event.type()) {
896 case CHANNEL_OPEN:
897 // Ignore.
898 break;
899 case CHANNEL_CLOSED:
900 handleChannelClosed(deviceId);
901 break;
902 case CHANNEL_ERROR:
903 // TODO evaluate other reaction to channel error.
904 log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
905 deviceId);
906 break;
907 case ROLE_MASTER:
908 handleMastershipResponse(deviceId, MastershipRole.MASTER);
909 break;
910 case ROLE_STANDBY:
911 handleMastershipResponse(deviceId, MastershipRole.STANDBY);
912 break;
913 case ROLE_NONE:
914 handleMastershipResponse(deviceId, MastershipRole.NONE);
915 break;
916 default:
917 log.warn("Unrecognized device agent event {}", event.type());
Andrea Campanella1e573442018-05-17 17:07:13 +0200918 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200919 }
920
921 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700922}