blob: 2b31b91690c478321c30552747592cd129ce915e [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020020import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020025import org.apache.felix.scr.annotations.Modified;
26import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020031import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020032import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
Andrea Campanella241896c2017-05-10 13:11:04 -070036import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020037import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070038import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.DefaultAnnotations;
40import org.onosproject.net.Device;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.MastershipRole;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.SparseAnnotations;
Carmelo Cascone87892e22017-11-13 16:01:29 -080045import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070046import org.onosproject.net.behaviour.PortAdmin;
47import org.onosproject.net.config.ConfigFactory;
48import org.onosproject.net.config.NetworkConfigEvent;
49import org.onosproject.net.config.NetworkConfigListener;
50import org.onosproject.net.config.NetworkConfigRegistry;
51import org.onosproject.net.config.basics.BasicDeviceConfig;
52import org.onosproject.net.config.basics.SubjectFactories;
53import org.onosproject.net.device.DefaultDeviceDescription;
Carmelo Casconee5b28722018-06-22 17:28:28 +020054import org.onosproject.net.device.DeviceAgentEvent;
55import org.onosproject.net.device.DeviceAgentListener;
Andrea Campanella241896c2017-05-10 13:11:04 -070056import org.onosproject.net.device.DeviceDescription;
57import org.onosproject.net.device.DeviceDescriptionDiscovery;
58import org.onosproject.net.device.DeviceEvent;
59import org.onosproject.net.device.DeviceHandshaker;
60import org.onosproject.net.device.DeviceListener;
61import org.onosproject.net.device.DeviceProvider;
62import org.onosproject.net.device.DeviceProviderRegistry;
63import org.onosproject.net.device.DeviceProviderService;
64import org.onosproject.net.device.DeviceService;
65import org.onosproject.net.device.PortDescription;
66import org.onosproject.net.device.PortStatistics;
67import org.onosproject.net.device.PortStatisticsDiscovery;
68import org.onosproject.net.driver.Behaviour;
69import org.onosproject.net.driver.DefaultDriverData;
70import org.onosproject.net.driver.DefaultDriverHandler;
71import org.onosproject.net.driver.Driver;
72import org.onosproject.net.driver.DriverData;
73import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040074import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020075import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080076import org.onosproject.net.pi.service.PiPipeconfConfig;
77import org.onosproject.net.pi.service.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070078import org.onosproject.net.provider.AbstractProvider;
79import org.onosproject.net.provider.ProviderId;
80import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020081import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070082import org.slf4j.Logger;
83
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080084import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070085import java.util.ArrayList;
86import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020087import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020088import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070089import java.util.List;
Thomas Vachuska5b38dc02018-05-10 15:24:40 -070090import java.util.Map;
Andrea Campanellace111932017-09-18 16:59:56 +090091import java.util.Objects;
Andrea Campanellabc112a92017-06-26 19:06:43 +020092import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070093import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020094import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020095import java.util.concurrent.ConcurrentMap;
96import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070097import java.util.concurrent.ExecutionException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020098import java.util.concurrent.ExecutorService;
Andrea Campanella241896c2017-05-10 13:11:04 -070099import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +0200100import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -0700101import java.util.concurrent.TimeUnit;
102import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200103import java.util.concurrent.locks.Lock;
104import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella241896c2017-05-10 13:11:04 -0700105
Carmelo Casconee5b28722018-06-22 17:28:28 +0200106import static java.util.concurrent.Executors.newFixedThreadPool;
Andrea Campanella241896c2017-05-10 13:11:04 -0700107import static java.util.concurrent.Executors.newScheduledThreadPool;
108import static org.onlab.util.Tools.groupedThreads;
109import static org.onosproject.net.device.DeviceEvent.Type;
110import static org.slf4j.LoggerFactory.getLogger;
111
112/**
113 * Provider which uses drivers to detect device and do initial handshake
114 * and channel establishment with devices. Any other provider specific operation
115 * is also delegated to the DeviceHandshaker driver.
116 */
117@Beta
118@Component(immediate = true)
119public class GeneralDeviceProvider extends AbstractProvider
120 implements DeviceProvider {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200121
122 // Timeout in seconds for operations on devices.
123 private static final int DEVICE_OP_TIMEOUT = 10;
124
125 private static final String DRIVER = "driver";
126 private static final String DEPLOY = "deploy-";
127 private static final String PIPECONF_TOPIC = "-pipeconf";
128 private static final String CHECK = "check-";
129 private static final String CONNECTION = "-connection";
Andrea Campanella1e573442018-05-17 17:07:13 +0200130 private static final String POLL_FREQUENCY = "pollFrequency";
Andrea Campanella14e196d2017-07-24 18:11:36 +0200131
Andrea Campanella241896c2017-05-10 13:11:04 -0700132 private final Logger log = getLogger(getClass());
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200135 private DeviceProviderRegistry providerRegistry;
Andrea Campanella241896c2017-05-10 13:11:04 -0700136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200138 private ComponentConfigService componentConfigService;
Andrea Campanella4929a812017-10-09 18:38:23 +0200139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200141 private NetworkConfigRegistry cfgService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200144 private CoreService coreService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200147 private DeviceService deviceService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200150 private DriverService driverService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 private MastershipService mastershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200156 private PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700157
Andrea Campanella14e196d2017-07-24 18:11:36 +0200158 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200159 private ClusterService clusterService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200160
161 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200162 private LeadershipService leadershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200163
Andrea Campanella19090322017-08-22 10:31:37 +0200164 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 10;
Andrea Campanella1e573442018-05-17 17:07:13 +0200165 @Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
Andrea Campanella19090322017-08-22 10:31:37 +0200166 label = "Configure poll frequency for port status and statistics; " +
167 "default is 10 sec")
168 private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
169
Andrea Campanella1e573442018-05-17 17:07:13 +0200170 private static final int DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS = 10;
171 @Property(name = "deviceAvailabilityPollFrequency", intValue = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS,
172 label = "Configure poll frequency for checking device availability; " +
173 "default is 10 sec")
174 private int deviceAvailabilityPollFrequency = DEVICE_AVAILABILITY_POLL_FREQUENCY_SECONDS;
175
Carmelo Casconee5b28722018-06-22 17:28:28 +0200176 private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
177 private static final String URI_SCHEME = "device";
178 private static final String CFG_SCHEME = "generalprovider";
Andrea Campanella241896c2017-05-10 13:11:04 -0700179 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
180 private static final int CORE_POOL_SIZE = 10;
181 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200182
Andrea Campanellabc112a92017-06-26 19:06:43 +0200183 //FIXME this will be removed when the configuration is synced at the source.
184 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200185
Andrea Campanella19090322017-08-22 10:31:37 +0200186 private static final ConcurrentMap<DeviceId, Lock> ENTRY_LOCKS = Maps.newConcurrentMap();
Andrea Campanellabc112a92017-06-26 19:06:43 +0200187 //FIXME to be removed when netcfg will issue device events in a bundle or
188 //ensures all configuration needed is present
189 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
190 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
191 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700192
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700193 private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
194
Carmelo Casconee5b28722018-06-22 17:28:28 +0200195 private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
Andrea Campanella241896c2017-05-10 13:11:04 -0700196
Andrea Campanella241896c2017-05-10 13:11:04 -0700197
Carmelo Casconee5b28722018-06-22 17:28:28 +0200198 private ExecutorService connectionExecutor
199 = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
200 "onos/generaldeviceprovider-device-connect", "%d", log));
201 private ScheduledExecutorService portStatsExecutor
202 = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
203 "onos/generaldeviceprovider-port-stats", "%d", log));
204 private ScheduledExecutorService availabilityCheckExecutor
205 = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
206 "onos/generaldeviceprovider-availability-check", "%d", log));
207 private ConcurrentMap<DeviceId, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
208
209 private DeviceProviderService providerService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700210 private InternalDeviceListener deviceListener = new InternalDeviceListener();
211
Carmelo Casconee5b28722018-06-22 17:28:28 +0200212 private final ConfigFactory factory =
Andrea Campanella241896c2017-05-10 13:11:04 -0700213 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
214 SubjectFactories.DEVICE_SUBJECT_FACTORY,
215 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
216 @Override
217 public GeneralProviderDeviceConfig createConfig() {
218 return new GeneralProviderDeviceConfig();
219 }
220 };
221
Carmelo Casconee5b28722018-06-22 17:28:28 +0200222 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
223 private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
Andrea Campanella241896c2017-05-10 13:11:04 -0700224
225
226 @Activate
Andrea Campanella1e573442018-05-17 17:07:13 +0200227 public void activate(ComponentContext context) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700228 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200229 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700230 coreService.registerApplication(APP_NAME);
231 cfgService.registerConfigFactory(factory);
232 cfgService.addListener(cfgListener);
233 deviceService.addListener(deviceListener);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700234 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700235 //This will fail if ONOS has CFG and drivers which depend on this provider
236 // are activated, failing due to not finding the driver.
237 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
238 .forEach(did -> connectionExecutor.execute(() -> connectDevice(did)));
Andrea Campanella1e573442018-05-17 17:07:13 +0200239 //Initiating a periodic check to see if any device is available again and reconnect it.
Carmelo Casconee5b28722018-06-22 17:28:28 +0200240 availabilityCheckExecutor.scheduleAtFixedRate(
241 this::scheduleDevicePolling, deviceAvailabilityPollFrequency,
Andrea Campanella1e573442018-05-17 17:07:13 +0200242 deviceAvailabilityPollFrequency, TimeUnit.SECONDS);
243 modified(context);
Andrea Campanella241896c2017-05-10 13:11:04 -0700244 log.info("Started");
245 }
246
Andrea Campanella19090322017-08-22 10:31:37 +0200247 @Modified
248 public void modified(ComponentContext context) {
249 if (context != null) {
250 Dictionary<?, ?> properties = context.getProperties();
Andrea Campanella1e573442018-05-17 17:07:13 +0200251 pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
Andrea Campanella19090322017-08-22 10:31:37 +0200252 DEFAULT_POLL_FREQUENCY_SECONDS);
253 log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
254 }
255
256 if (!scheduledTasks.isEmpty()) {
257 //cancel all previous tasks
258 scheduledTasks.values().forEach(task -> task.cancel(false));
259 //resubmit task with new timeout.
260 Set<DeviceId> deviceSubjects =
261 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class);
262 deviceSubjects.forEach(deviceId -> {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200263 if (notMyScheme(deviceId)) {
Andrea Campanella19090322017-08-22 10:31:37 +0200264 // not under my scheme, skipping
265 log.debug("{} is not my scheme, skipping", deviceId);
266 return;
267 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200268 scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, true));
Andrea Campanella19090322017-08-22 10:31:37 +0200269 });
270 }
Andrea Campanella19090322017-08-22 10:31:37 +0200271 }
272
Andrea Campanella241896c2017-05-10 13:11:04 -0700273 @Deactivate
274 public void deactivate() {
275 portStatsExecutor.shutdown();
Andrea Campanella1e573442018-05-17 17:07:13 +0200276 availabilityCheckExecutor.shutdown();
Andrea Campanella4929a812017-10-09 18:38:23 +0200277 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700278 cfgService.removeListener(cfgListener);
279 //Not Removing the device so they can still be used from other driver providers
280 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
281 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
282 connectionExecutor.shutdown();
283 deviceService.removeListener(deviceListener);
284 providerRegistry.unregister(this);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700285 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700286 providerService = null;
287 cfgService.unregisterConfigFactory(factory);
288 log.info("Stopped");
289 }
290
291 public GeneralDeviceProvider() {
292 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
293 }
294
295
296 @Override
297 public void triggerProbe(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700298 // TODO Really don't see the point of this in non OF Context,
Andrea Campanella241896c2017-05-10 13:11:04 -0700299 // for now testing reachability, can be moved to no-op
300 log.debug("Triggering probe equals testing reachability on device {}", deviceId);
301 isReachable(deviceId);
302 }
303
304 @Override
305 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200306 log.info("Received role {} for device {}", newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200307 requestedRoles.put(deviceId, newRole);
308 connectionExecutor.submit(() -> doRoleChanged(deviceId, newRole));
309 }
310
311 private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
312 DeviceHandshaker handshaker = getHandshaker(deviceId);
313 if (handshaker == null) {
314 log.warn("Null handshaker. Unable to notify new role {} to {}",
315 newRole, deviceId);
316 return;
317 }
318 handshaker.roleChanged(newRole);
Andrea Campanella241896c2017-05-10 13:11:04 -0700319 }
320
321 @Override
322 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200323 log.debug("Testing reachability for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100324
325 DeviceHandshaker handshaker = getHandshaker(deviceId);
326 if (handshaker == null) {
327 return false;
328 }
329
Andrea Campanella241896c2017-05-10 13:11:04 -0700330 try {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200331 return handshaker.isReachable()
332 .get(DEVICE_OP_TIMEOUT, TimeUnit.SECONDS);
Andrea Campanella241896c2017-05-10 13:11:04 -0700333 } catch (InterruptedException | ExecutionException | TimeoutException e) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200334 log.warn("Device {} is not reachable {}", deviceId, e.getMessage());
335 log.debug("Exception", e);
Andrea Campanella241896c2017-05-10 13:11:04 -0700336 return false;
337 }
338 }
339
340 @Override
341 public void changePortState(DeviceId deviceId, PortNumber portNumber,
342 boolean enable) {
343 if (deviceService.getDevice(deviceId).is(PortAdmin.class)) {
344
345 PortAdmin portAdmin = getPortAdmin(deviceId);
346 CompletableFuture<Boolean> modified;
347 if (enable) {
348 modified = portAdmin.enable(portNumber);
349 } else {
350 modified = portAdmin.disable(portNumber);
351 }
352 modified.thenAcceptAsync(result -> {
353 if (!result) {
354 log.warn("Your device {} port {} status can't be changed to {}",
Andrea Campanella19090322017-08-22 10:31:37 +0200355 deviceId, portNumber, enable);
Andrea Campanella241896c2017-05-10 13:11:04 -0700356 }
357 });
358
359 } else {
360 log.warn("Device {} does not support PortAdmin behaviour", deviceId);
361 }
362 }
363
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700364 @Override
365 public void triggerDisconnect(DeviceId deviceId) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200366 log.debug("Triggering disconnection of device {}", deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200367 connectionExecutor.execute(() -> disconnectDevice(deviceId)
368 .thenRunAsync(() -> checkAndConnect(deviceId)));
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700369 }
370
Andrea Campanella241896c2017-05-10 13:11:04 -0700371 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700372 return handshakers.computeIfAbsent(deviceId, id -> {
373 Driver driver = getDriver(deviceId);
374 return driver == null ? null :
375 getBehaviour(driver, DeviceHandshaker.class,
376 new DefaultDriverData(driver, deviceId));
377 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700378 }
379
380 private PortAdmin getPortAdmin(DeviceId deviceId) {
381 Driver driver = getDriver(deviceId);
382 return getBehaviour(driver, PortAdmin.class,
Andrea Campanella19090322017-08-22 10:31:37 +0200383 new DefaultDriverData(driver, deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700384
385 }
386
387 private Driver getDriver(DeviceId deviceId) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100388 Driver driver = null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700389 try {
390 driver = driverService.getDriver(deviceId);
391 } catch (ItemNotFoundException e) {
392 log.debug("Falling back to configuration to fetch driver " +
Andrea Campanella19090322017-08-22 10:31:37 +0200393 "for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100394 BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
395 if (cfg != null) {
396 driver = driverService.getDriver(cfg.driver());
397 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700398 }
399 return driver;
400 }
401
402 //needed since the device manager will not return the driver through implementation()
403 // method since the device is not pushed to the core so for the connectDevice
404 // we need to work around that in order to test before calling
405 // store.createOrUpdateDevice
406 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
407 DriverData data) {
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100408 if (driver != null && driver.hasBehaviour(type)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700409 DefaultDriverHandler handler = new DefaultDriverHandler(data);
410 return driver.createBehaviour(handler, type);
411 } else {
412 return null;
413 }
414 }
415
416 //Connects a general device
417 private void connectDevice(DeviceId deviceId) {
418 //retrieve the configuration
419 GeneralProviderDeviceConfig providerConfig =
420 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
421 BasicDeviceConfig basicDeviceConfig =
422 cfgService.getConfig(deviceId, BasicDeviceConfig.class);
423
424 if (providerConfig == null || basicDeviceConfig == null) {
425 log.error("Configuration is NULL: basic config {}, general provider " +
Andrea Campanella19090322017-08-22 10:31:37 +0200426 "config {}", basicDeviceConfig, providerConfig);
Andrea Campanella241896c2017-05-10 13:11:04 -0700427 } else {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200428 log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
Andrea Campanella241896c2017-05-10 13:11:04 -0700429
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700430 DeviceHandshaker handshaker = getHandshaker(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200431 if (handshaker == null) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700432 log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200433 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700434 }
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700435 Driver driver = handshaker.handler().driver();
Frank Wang554ce972017-09-06 09:56:43 +0800436
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700437 addConfigData(providerConfig, handshaker.data());
Andrea Campanellabc112a92017-06-26 19:06:43 +0200438
439 //Connecting to the device
440 CompletableFuture<Boolean> connected = handshaker.connect();
441
442 connected.thenAcceptAsync(result -> {
443 if (result) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200444 handshaker.addDeviceAgentListener(deviceAgentListener);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200445 //Populated with the default values obtained by the driver
446 ChassisId cid = new ChassisId();
447 SparseAnnotations annotations = DefaultAnnotations.builder()
448 .set(AnnotationKeys.PROTOCOL,
Andrea Campanella19090322017-08-22 10:31:37 +0200449 providerConfig.protocolsInfo().keySet().toString())
Andrea Campanellabc112a92017-06-26 19:06:43 +0200450 .build();
451 DeviceDescription description =
452 new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
Andrea Campanella19090322017-08-22 10:31:37 +0200453 driver.manufacturer(), driver.hwVersion(),
454 driver.swVersion(), UNKNOWN,
Yi Tseng92494fb2017-12-05 15:14:53 -0800455 cid, true, annotations);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200456 //Empty list of ports
457 List<PortDescription> ports = new ArrayList<>();
458
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100459 DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(driver,
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700460 DeviceDescriptionDiscovery.class, handshaker.data());
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100461 if (deviceDiscovery != null) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200462 DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
463 if (newdescription != null) {
464 description = newdescription;
465 }
466 ports = deviceDiscovery.discoverPortDetails();
Andrea Campanellabf9e5ce2017-12-06 14:26:36 +0100467 } else {
468 log.info("No Device Description Discovery for device {}, no update for " +
469 "description or ports.", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200470 }
471
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700472 if (!handlePipeconf(deviceId, driver, handshaker.data(), true)) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400473 // Something went wrong during handling of pipeconf.
474 // We already logged the error.
475 handshaker.disconnect();
476 return;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200477 }
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400478 advertiseDevice(deviceId, description, ports);
479
Andrea Campanellabc112a92017-06-26 19:06:43 +0200480 } else {
481 log.warn("Can't connect to device {}", deviceId);
482 }
483 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700484 }
485 }
486
Andrea Campanella14e196d2017-07-24 18:11:36 +0200487 private void connectStandbyDevice(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700488 // if device is pipeline programmable we merge pipeconf + base driver for every other role
Andrea Campanella14e196d2017-07-24 18:11:36 +0200489 GeneralProviderDeviceConfig providerConfig =
490 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
491
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700492 DeviceHandshaker handshaker = getHandshaker(deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200493 if (handshaker == null) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700494 log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200495 return;
496 }
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700497 addConfigData(providerConfig, handshaker.data());
Andrea Campanella14e196d2017-07-24 18:11:36 +0200498
499 //Connecting to the device
500 handshaker.connect().thenAcceptAsync(result -> {
501 if (result) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200502 handshaker.addDeviceAgentListener(deviceAgentListener);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700503 handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200504 }
505 });
506 }
507
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400508 /**
509 * Handles the case of a device that is pipeline programmable. Returns true if the operation wa successful and the
510 * device can be registered to the core, false otherwise.
511 */
Andrea Campanella14e196d2017-07-24 18:11:36 +0200512 private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700513 PiPipelineProgrammable pipelineProg =
514 getBehaviour(driver, PiPipelineProgrammable.class, driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400515
516 if (pipelineProg == null) {
517 // Device is not pipeline programmable.
518 return true;
519 }
520
Andrea Campanella14e196d2017-07-24 18:11:36 +0200521 PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
522
523 if (pipeconf != null) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200524 PiPipeconfId pipeconfId = pipeconf.id();
525
526 try {
527 if (deployPipeconf) {
528 if (!pipelineProg.deployPipeconf(pipeconf).get()) {
529 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700530 pipeconfId, deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200531 return false;
532 }
533 }
534 } catch (InterruptedException | ExecutionException e) {
535 log.warn("Exception occurred while deploying pipeconf {} to device {}", pipeconf.id(), deviceId, e);
536 return false;
537 }
538 try {
539 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
540 log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700541 driver.name(), deviceId, pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200542 return false;
543 }
544 } catch (InterruptedException | ExecutionException e) {
545 log.warn("Exception occurred while binding pipeconf {} to device {}", pipeconf.id(), deviceId, e);
546 return false;
547 }
548 } else {
549 return false;
550 }
551
552 return true;
553 }
554
555 private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400556 PiPipeconfId pipeconfId = piPipeconfService.ofDevice(deviceId).orElseGet(() -> {
557 // No pipeconf has been associated with this device.
558 // Check if device driver provides a default one.
559 if (pipelineProg.getDefaultPipeconf().isPresent()) {
560 PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
561 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400562 return defaultPipeconf.id();
563 } else {
564 return null;
565 }
566 });
567
568 if (pipeconfId == null) {
569 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it.", deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200570 return null;
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200571 }
572
573 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
574 log.warn("Pipeconf {} is not registered", pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200575 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400576 }
577
Andrea Campanella14e196d2017-07-24 18:11:36 +0200578 return piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400579 }
580
Andrea Campanellabc112a92017-06-26 19:06:43 +0200581 private void advertiseDevice(DeviceId deviceId, DeviceDescription description, List<PortDescription> ports) {
582 providerService.deviceConnected(deviceId, description);
583 providerService.updatePorts(deviceId, ports);
584 }
585
Andrea Campanella1e573442018-05-17 17:07:13 +0200586 private CompletableFuture<Boolean> disconnectDevice(DeviceId deviceId) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700587 log.info("Disconnecting for device {}", deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100588
Carmelo Casconee5b28722018-06-22 17:28:28 +0200589 if (scheduledTasks.containsKey(deviceId)) {
590 scheduledTasks.remove(deviceId).cancel(true);
591 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200592
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700593 DeviceHandshaker handshaker = handshakers.remove(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200594
595 if (handshaker == null) {
596 // gracefully ignoring.
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700597 log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
Carmelo Casconee5b28722018-06-22 17:28:28 +0200598 "shutdown of communication", deviceId);
599 return CompletableFuture.completedFuture(false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700600 }
Carmelo Casconee5b28722018-06-22 17:28:28 +0200601
602 return handshaker.disconnect()
603 .thenApplyAsync(result -> {
604 if (result) {
605 log.info("Disconnected device {}", deviceId);
606 providerService.deviceDisconnected(deviceId);
607 } else {
608 log.warn("Device {} was unable to disconnect", deviceId);
609 }
610 return result;
611 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700612 }
613
614 //Needed to catch the exception in the executors since are not rethrown otherwise.
615 private Runnable exceptionSafe(Runnable runnable) {
616 return () -> {
617 try {
618 runnable.run();
619 } catch (Exception e) {
620 log.error("Unhandled Exception", e);
621 }
622 };
623 }
624
625 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900626 Device device = deviceService.getDevice(deviceId);
627 if (!Objects.isNull(device) && deviceService.isAvailable(deviceId) &&
628 device.is(PortStatisticsDiscovery.class)) {
629 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
630 .discoverPortStatistics();
631 //updating statistcs only if not empty
632 if (!statistics.isEmpty()) {
633 providerService.updatePortStatistics(deviceId, statistics);
634 }
635 } else {
636 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200637 }
638 }
639
Carmelo Casconee5b28722018-06-22 17:28:28 +0200640 private boolean notMyScheme(DeviceId deviceId) {
641 return !deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700642 }
643
644 /**
645 * Listener for configuration events.
646 */
647 private class InternalNetworkConfigListener implements NetworkConfigListener {
648
Andrea Campanella241896c2017-05-10 13:11:04 -0700649 @Override
650 public void event(NetworkConfigEvent event) {
651 DeviceId deviceId = (DeviceId) event.subject();
652 //Assuming that the deviceId comes with uri 'device:'
Carmelo Casconee5b28722018-06-22 17:28:28 +0200653 if (notMyScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700654 // not under my scheme, skipping
655 log.debug("{} is not my scheme, skipping", deviceId);
656 return;
657 }
Andrea Campanellace111932017-09-18 16:59:56 +0900658 if (deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700659 log.info("Device {} is already connected to ONOS and is available", deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200660 return;
661 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200662 NodeId leaderNodeId = leadershipService.runForLeadership(DEPLOY + deviceId.toString() + PIPECONF_TOPIC)
663 .leader().nodeId();
664 NodeId localNodeId = clusterService.getLocalNode().id();
665 if (localNodeId.equals(leaderNodeId)) {
666 if (processEvent(event, deviceId)) {
667 log.debug("{} is leader for {}, initiating the connection and deploying pipeline", leaderNodeId,
668 deviceId);
669 checkAndSubmitDeviceTask(deviceId);
670 }
671 } else {
672 if (processEvent(event, deviceId)) {
673 log.debug("{} is not leader for {}, initiating connection but not deploying pipeline, {} is LEADER",
674 localNodeId, deviceId, leaderNodeId);
675 connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
676 //FIXME this will be removed when config is synced
677 cleanUpConfigInfo(deviceId);
678 }
679 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200680 }
681
682 private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200683 //FIXME to be removed when netcfg will issue device events in a bundle or
684 // ensure all configuration needed is present
685 Lock lock = ENTRY_LOCKS.computeIfAbsent(deviceId, key -> new ReentrantLock());
686 lock.lock();
687 try {
688 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
689 //FIXME we currently assume that p4runtime devices are pipeline configurable.
690 //If we want to connect a p4runtime device with no pipeline
691 if (event.config().isPresent() &&
692 Collections.disjoint(ImmutableSet.copyOf(event.config().get().node().fieldNames()),
Andrea Campanella19090322017-08-22 10:31:37 +0200693 PIPELINE_CONFIGURABLE_PROTOCOLS)) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200694 pipelineConfigured.add(deviceId);
695 }
696 deviceConfigured.add(deviceId);
697 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
698 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
699 //TODO add check for pipeline and add it to the pipeline list if no
700 // p4runtime is present.
701 driverConfigured.add(deviceId);
702 }
703 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
704 if (event.config().isPresent()
705 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
706 pipelineConfigured.add(deviceId);
707 }
708 }
709 //if the device has no "pipeline configurable protocol it will be present
710 // in the pipelineConfigured
711 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)
712 && pipelineConfigured.contains(deviceId)) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200713 return true;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200714 } else {
715 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
716 log.debug("Waiting for pipeline configuration for device {}", deviceId);
717 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
718 log.debug("Waiting for device configuration for device {}", deviceId);
719 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
720 log.debug("Waiting for driver configuration for device {}", deviceId);
721 } else if (driverConfigured.contains(deviceId)) {
722 log.debug("Only driver configuration for device {}", deviceId);
723 } else if (deviceConfigured.contains(deviceId)) {
724 log.debug("Only device configuration for device {}", deviceId);
725 }
726 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200727 return false;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200728 } finally {
729 lock.unlock();
Andrea Campanella241896c2017-05-10 13:11:04 -0700730 }
731 }
732
733 @Override
734 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200735 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
736 event.configClass().equals(BasicDeviceConfig.class) ||
737 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700738 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
739 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
740 }
741 }
742
Andrea Campanellabc112a92017-06-26 19:06:43 +0200743 private void checkAndSubmitDeviceTask(DeviceId deviceId) {
744 connectionExecutor.submit(exceptionSafe(() -> connectDevice(deviceId)));
745 //FIXME this will be removed when configuration is synced.
Andrea Campanella14e196d2017-07-24 18:11:36 +0200746 cleanUpConfigInfo(deviceId);
747
748 }
749
750 private void addConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
751 //Storing deviceKeyId and all other config values
752 // as data in the driver with protocol_<info>
753 // name as the key. e.g protocol_ip
754 providerConfig.protocolsInfo()
755 .forEach((protocol, deviceInfoConfig) -> {
756 deviceInfoConfig.configValues()
757 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
758 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
759 });
760 }
761
762 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200763 deviceConfigured.remove(deviceId);
764 driverConfigured.remove(deviceId);
765 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200766 }
767
Carmelo Casconee5b28722018-06-22 17:28:28 +0200768 private ScheduledFuture<?> scheduleStatsPolling(DeviceId deviceId, boolean randomize) {
Andrea Campanella19090322017-08-22 10:31:37 +0200769 int delay = 0;
770 if (randomize) {
Ray Milkeyb68bbbc2017-12-18 10:05:49 -0800771 delay = new SecureRandom().nextInt(10);
Andrea Campanella19090322017-08-22 10:31:37 +0200772 }
773 return portStatsExecutor.scheduleAtFixedRate(
774 exceptionSafe(() -> updatePortStatistics(deviceId)),
775 delay, pollFrequency, TimeUnit.SECONDS);
776 }
777
Andrea Campanella1e573442018-05-17 17:07:13 +0200778 private void scheduleDevicePolling() {
779 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class).forEach(this::checkAndConnect);
780 }
781
782 private void checkAndConnect(DeviceId deviceId) {
783 // Let's try and reconnect to a device which is stored in the net-cfg.
784 // One of the following conditions must be satisfied:
785 // 1) device is null in the store meaning that is was never connected or it was administratively removed
786 // 2) the device is not available and there is no MASTER instance, meaning the device lost
787 // it's connection to ONOS at some point in the past.
788 // We also check that the general device provider config and the driver config are present.
789 // We do not check for reachability using isReachable(deviceId) since the behaviour of this method
790 // can vary depending on protocol nuances. We leave this check to the device handshaker
791 // at later stages of the connection process.
792 // IF the conditions are not met but instead the device is present in the store, available and this instance is
793 // MASTER but is not reachable we remove it from the store.
794
795 if ((deviceService.getDevice(deviceId) == null || (!deviceService.isAvailable(deviceId) &&
796 mastershipService.getMasterFor(deviceId) == null)) && configIsPresent(deviceId)) {
797 log.debug("Trying to re-connect to device {}", deviceId);
798 NodeId leaderNodeId = leadershipService.runForLeadership(CHECK + deviceId.toString() + CONNECTION)
799 .leader().nodeId();
800 NodeId localNodeId = clusterService.getLocalNode().id();
801 if (localNodeId.equals(leaderNodeId)) {
802 log.debug("{} is leader for {}, initiating the connection", leaderNodeId,
803 deviceId);
804 checkAndSubmitDeviceTask(deviceId);
805 } else {
806 log.debug("{} is not leader for {}, initiating connection, {} is LEADER",
807 localNodeId, deviceId, leaderNodeId);
808 connectionExecutor.submit(exceptionSafe(() -> connectStandbyDevice(deviceId)));
809 //FIXME this will be removed when config is synced
810 cleanUpConfigInfo(deviceId);
811 }
812 } else if ((deviceService.getDevice(deviceId) != null && deviceService.isAvailable(deviceId)
813 && mastershipService.isLocalMaster(deviceId) && !isReachable(deviceId) && configIsPresent(deviceId))) {
814 log.info("Removing available but unreachable device {}", deviceId);
815 disconnectDevice(deviceId);
816 providerService.deviceDisconnected(deviceId);
817 }
818 }
819
820 private boolean configIsPresent(DeviceId deviceId) {
821 boolean present = cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
822 && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
823 if (!present) {
824 log.warn("Configuration for device {} is not complete", deviceId);
825 }
826 return present;
827 }
828
Carmelo Casconee5b28722018-06-22 17:28:28 +0200829 private void handleChannelClosed(DeviceId deviceId) {
830 disconnectDevice(deviceId).thenRunAsync(() -> {
831 // If master, notifies disconnection to the core.
832 if (mastershipService.isLocalMaster(deviceId)) {
833 log.info("Disconnecting device {}, due to channel closed event",
834 deviceId);
835 providerService.deviceDisconnected(deviceId);
836 }
837 });
838 }
839
840 private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
841 //Notify core about response.
842 if (!requestedRoles.containsKey(deviceId)) {
843 return;
844 }
845 providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
846 // If not master, cancel polling tasks, otherwise start them.
847 if (!response.equals(MastershipRole.MASTER)
848 && scheduledTasks.get(deviceId) != null) {
849 scheduledTasks.remove(deviceId).cancel(false);
850 } else if (response.equals(MastershipRole.MASTER)
851 && scheduledTasks.get(deviceId) == null) {
852 scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
853 updatePortStatistics(deviceId);
854 }
855 }
856
Andrea Campanella241896c2017-05-10 13:11:04 -0700857 /**
858 * Listener for core device events.
859 */
860 private class InternalDeviceListener implements DeviceListener {
861 @Override
862 public void event(DeviceEvent event) {
Andrea Campanellace111932017-09-18 16:59:56 +0900863 DeviceId deviceId = event.subject().id();
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700864 // FIXME handling for mastership change scenario missing?
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700865 // For now this is scheduled periodically, when streaming API will
866 // be available we check and base it on the streaming API (e.g. gNMI)
867 if (mastershipService.isLocalMaster(deviceId)) {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200868 scheduledTasks.put(deviceId, scheduleStatsPolling(deviceId, false));
Andrea Campanella241896c2017-05-10 13:11:04 -0700869 }
870 }
871
872 @Override
873 public boolean isRelevant(DeviceEvent event) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700874 return event.type() == Type.DEVICE_ADDED &&
875 event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700876 }
877 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200878
879 /**
Carmelo Casconee5b28722018-06-22 17:28:28 +0200880 * Listener for device agent events.
Andrea Campanella1e573442018-05-17 17:07:13 +0200881 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200882 private class InternalDeviceAgentListener implements DeviceAgentListener {
Andrea Campanella1e573442018-05-17 17:07:13 +0200883
884 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200885 public void event(DeviceAgentEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200886 DeviceId deviceId = event.subject();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200887 switch (event.type()) {
888 case CHANNEL_OPEN:
889 // Ignore.
890 break;
891 case CHANNEL_CLOSED:
892 handleChannelClosed(deviceId);
893 break;
894 case CHANNEL_ERROR:
895 // TODO evaluate other reaction to channel error.
896 log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
897 deviceId);
898 break;
899 case ROLE_MASTER:
900 handleMastershipResponse(deviceId, MastershipRole.MASTER);
901 break;
902 case ROLE_STANDBY:
903 handleMastershipResponse(deviceId, MastershipRole.STANDBY);
904 break;
905 case ROLE_NONE:
906 handleMastershipResponse(deviceId, MastershipRole.NONE);
907 break;
908 default:
909 log.warn("Unrecognized device agent event {}", event.type());
Andrea Campanella1e573442018-05-17 17:07:13 +0200910 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200911 }
912
913 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700914}