blob: b3411496c17ae4b9f0f4f887600bd618ec9ab913 [file] [log] [blame]
Andrea Campanella241896c2017-05-10 13:11:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Andrea Campanella241896c2017-05-10 13:11:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
Carmelo Cascone158b8c42018-07-04 19:42:37 +020019import com.fasterxml.jackson.databind.JsonNode;
20import com.fasterxml.jackson.databind.ObjectMapper;
21import com.fasterxml.jackson.databind.node.ObjectNode;
Andrea Campanella241896c2017-05-10 13:11:04 -070022import com.google.common.annotations.Beta;
Andrea Campanellabc112a92017-06-26 19:06:43 +020023import com.google.common.collect.ImmutableSet;
24import com.google.common.collect.Maps;
Carmelo Cascone158b8c42018-07-04 19:42:37 +020025import com.google.common.util.concurrent.Striped;
Andrea Campanella241896c2017-05-10 13:11:04 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
Andrea Campanella19090322017-08-22 10:31:37 +020029import org.apache.felix.scr.annotations.Modified;
30import org.apache.felix.scr.annotations.Property;
Andrea Campanella241896c2017-05-10 13:11:04 -070031import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.onlab.packet.ChassisId;
34import org.onlab.util.ItemNotFoundException;
Andrea Campanella19090322017-08-22 10:31:37 +020035import org.onlab.util.Tools;
Andrea Campanella4929a812017-10-09 18:38:23 +020036import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020037import org.onosproject.cluster.ClusterService;
38import org.onosproject.cluster.LeadershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070039import org.onosproject.core.CoreService;
Andrea Campanella14e196d2017-07-24 18:11:36 +020040import org.onosproject.mastership.MastershipService;
Andrea Campanella241896c2017-05-10 13:11:04 -070041import org.onosproject.net.AnnotationKeys;
42import org.onosproject.net.DefaultAnnotations;
43import org.onosproject.net.Device;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.MastershipRole;
46import org.onosproject.net.PortNumber;
Carmelo Cascone87892e22017-11-13 16:01:29 -080047import org.onosproject.net.behaviour.PiPipelineProgrammable;
Andrea Campanella241896c2017-05-10 13:11:04 -070048import org.onosproject.net.behaviour.PortAdmin;
49import org.onosproject.net.config.ConfigFactory;
50import org.onosproject.net.config.NetworkConfigEvent;
51import org.onosproject.net.config.NetworkConfigListener;
52import org.onosproject.net.config.NetworkConfigRegistry;
53import org.onosproject.net.config.basics.BasicDeviceConfig;
54import org.onosproject.net.config.basics.SubjectFactories;
55import org.onosproject.net.device.DefaultDeviceDescription;
Carmelo Casconee5b28722018-06-22 17:28:28 +020056import org.onosproject.net.device.DeviceAgentEvent;
57import org.onosproject.net.device.DeviceAgentListener;
Andrea Campanella241896c2017-05-10 13:11:04 -070058import org.onosproject.net.device.DeviceDescription;
59import org.onosproject.net.device.DeviceDescriptionDiscovery;
60import org.onosproject.net.device.DeviceEvent;
61import org.onosproject.net.device.DeviceHandshaker;
62import org.onosproject.net.device.DeviceListener;
63import org.onosproject.net.device.DeviceProvider;
64import org.onosproject.net.device.DeviceProviderRegistry;
65import org.onosproject.net.device.DeviceProviderService;
66import org.onosproject.net.device.DeviceService;
67import org.onosproject.net.device.PortDescription;
68import org.onosproject.net.device.PortStatistics;
69import org.onosproject.net.device.PortStatisticsDiscovery;
70import org.onosproject.net.driver.Behaviour;
71import org.onosproject.net.driver.DefaultDriverData;
72import org.onosproject.net.driver.DefaultDriverHandler;
73import org.onosproject.net.driver.Driver;
74import org.onosproject.net.driver.DriverData;
75import org.onosproject.net.driver.DriverService;
Carmelo Cascone59f57de2017-07-11 19:55:09 -040076import org.onosproject.net.pi.model.PiPipeconf;
Andrea Campanellabc112a92017-06-26 19:06:43 +020077import org.onosproject.net.pi.model.PiPipeconfId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080078import org.onosproject.net.pi.service.PiPipeconfConfig;
79import org.onosproject.net.pi.service.PiPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -070080import org.onosproject.net.provider.AbstractProvider;
81import org.onosproject.net.provider.ProviderId;
82import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
Andrea Campanella19090322017-08-22 10:31:37 +020083import org.osgi.service.component.ComponentContext;
Andrea Campanella241896c2017-05-10 13:11:04 -070084import org.slf4j.Logger;
85
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080086import java.security.SecureRandom;
Andrea Campanella241896c2017-05-10 13:11:04 -070087import java.util.ArrayList;
88import java.util.Collection;
Andrea Campanellabc112a92017-06-26 19:06:43 +020089import java.util.Collections;
Andrea Campanella19090322017-08-22 10:31:37 +020090import java.util.Dictionary;
Andrea Campanella241896c2017-05-10 13:11:04 -070091import java.util.List;
Thomas Vachuska5b38dc02018-05-10 15:24:40 -070092import java.util.Map;
Andrea Campanellabc112a92017-06-26 19:06:43 +020093import java.util.Set;
Andrea Campanella241896c2017-05-10 13:11:04 -070094import java.util.concurrent.CompletableFuture;
Andrea Campanella19090322017-08-22 10:31:37 +020095import java.util.concurrent.ConcurrentHashMap;
Andrea Campanellabc112a92017-06-26 19:06:43 +020096import java.util.concurrent.ConcurrentMap;
97import java.util.concurrent.CopyOnWriteArraySet;
Andrea Campanella241896c2017-05-10 13:11:04 -070098import java.util.concurrent.ExecutionException;
Carmelo Casconee5b28722018-06-22 17:28:28 +020099import java.util.concurrent.ExecutorService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700100import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella19090322017-08-22 10:31:37 +0200101import java.util.concurrent.ScheduledFuture;
Andrea Campanella241896c2017-05-10 13:11:04 -0700102import java.util.concurrent.TimeUnit;
103import java.util.concurrent.TimeoutException;
Andrea Campanellabc112a92017-06-26 19:06:43 +0200104import java.util.concurrent.locks.Lock;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200105import java.util.function.Supplier;
Andrea Campanella241896c2017-05-10 13:11:04 -0700106
Carmelo Casconee5b28722018-06-22 17:28:28 +0200107import static java.util.concurrent.Executors.newFixedThreadPool;
Andrea Campanella241896c2017-05-10 13:11:04 -0700108import static java.util.concurrent.Executors.newScheduledThreadPool;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200109import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Andrea Campanella241896c2017-05-10 13:11:04 -0700110import static org.onlab.util.Tools.groupedThreads;
111import static org.onosproject.net.device.DeviceEvent.Type;
112import static org.slf4j.LoggerFactory.getLogger;
113
114/**
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200115 * Provider which uses drivers to detect device and do initial handshake and
116 * channel establishment with devices. Any other provider specific operation is
117 * also delegated to the DeviceHandshaker driver.
Andrea Campanella241896c2017-05-10 13:11:04 -0700118 */
119@Beta
120@Component(immediate = true)
121public class GeneralDeviceProvider extends AbstractProvider
122 implements DeviceProvider {
Carmelo Casconee5b28722018-06-22 17:28:28 +0200123
Carmelo Casconee5b28722018-06-22 17:28:28 +0200124 private static final String DRIVER = "driver";
Andrea Campanella14e196d2017-07-24 18:11:36 +0200125
Andrea Campanella241896c2017-05-10 13:11:04 -0700126 private final Logger log = getLogger(getClass());
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200129 private DeviceProviderRegistry providerRegistry;
Andrea Campanella241896c2017-05-10 13:11:04 -0700130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200132 private ComponentConfigService componentConfigService;
Andrea Campanella4929a812017-10-09 18:38:23 +0200133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200135 private NetworkConfigRegistry cfgService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200138 private CoreService coreService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200141 private DeviceService deviceService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200144 private DriverService driverService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200147 private MastershipService mastershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200150 private PiPipeconfService piPipeconfService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700151
Andrea Campanella14e196d2017-07-24 18:11:36 +0200152 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200153 private ClusterService clusterService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Casconee5b28722018-06-22 17:28:28 +0200156 private LeadershipService leadershipService;
Andrea Campanella14e196d2017-07-24 18:11:36 +0200157
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200158 private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
159 private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
160 @Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
Andrea Campanella19090322017-08-22 10:31:37 +0200161 label = "Configure poll frequency for port status and statistics; " +
162 "default is 10 sec")
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200163 private int statsPollFrequency = DEFAULT_STATS_POLL_FREQUENCY;
Andrea Campanella19090322017-08-22 10:31:37 +0200164
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200165 private static final String PROBE_FREQUENCY = "deviceProbeFrequency";
166 private static final int DEFAULT_PROBE_FREQUENCY = 10;
167 @Property(name = PROBE_FREQUENCY, intValue = DEFAULT_PROBE_FREQUENCY,
168 label = "Configure probe frequency for checking device availability; " +
Andrea Campanella1e573442018-05-17 17:07:13 +0200169 "default is 10 sec")
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200170 private int probeFrequency = DEFAULT_PROBE_FREQUENCY;
171
172 private static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
173 private static final int DEFAULT_OP_TIMEOUT_SHORT = 10;
174 @Property(name = OP_TIMEOUT_SHORT, intValue = DEFAULT_OP_TIMEOUT_SHORT,
175 label = "Configure timeout in seconds for device operations " +
176 "that are supposed to take a short time " +
177 "(e.g. checking device reachability); default is 10 seconds")
178 private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
179
180 private static final String OP_TIMEOUT_LONG = "deviceOperationTimeoutLong";
181 private static final int DEFAULT_OP_TIMEOUT_LONG = 60;
182 @Property(name = OP_TIMEOUT_LONG, intValue = DEFAULT_OP_TIMEOUT_LONG,
183 label = "Configure timeout in seconds for device operations " +
184 "that are supposed to take a relatively long time " +
185 "(e.g. pushing a large pipeline configuration with slow " +
186 "network); default is 60 seconds")
187 private int opTimeoutLong = DEFAULT_OP_TIMEOUT_LONG;
Andrea Campanella1e573442018-05-17 17:07:13 +0200188
Carmelo Casconee5b28722018-06-22 17:28:28 +0200189 private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
190 private static final String URI_SCHEME = "device";
191 private static final String CFG_SCHEME = "generalprovider";
Andrea Campanella241896c2017-05-10 13:11:04 -0700192 private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
193 private static final int CORE_POOL_SIZE = 10;
194 private static final String UNKNOWN = "unknown";
Andrea Campanella19090322017-08-22 10:31:37 +0200195
Andrea Campanellabc112a92017-06-26 19:06:43 +0200196 //FIXME this will be removed when the configuration is synced at the source.
197 private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
Andrea Campanellabc112a92017-06-26 19:06:43 +0200198
199 //FIXME to be removed when netcfg will issue device events in a bundle or
200 //ensures all configuration needed is present
201 private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
202 private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
203 private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
Andrea Campanella241896c2017-05-10 13:11:04 -0700204
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700205 private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200206 private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200207 private final Striped<Lock> deviceLocks = Striped.lock(30);
Andrea Campanella241896c2017-05-10 13:11:04 -0700208
Carmelo Casconee5b28722018-06-22 17:28:28 +0200209 private ExecutorService connectionExecutor
210 = newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200211 "onos/generaldeviceprovider-device-connect", "%d", log));
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200212 private ScheduledExecutorService statsExecutor
Carmelo Casconee5b28722018-06-22 17:28:28 +0200213 = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200214 "onos/generaldeviceprovider-stats-poll", "%d", log));
215 private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
216 private ScheduledExecutorService probeExecutor
217 = newSingleThreadScheduledExecutor(groupedThreads(
218 "onos/generaldeviceprovider-probe-", "%d", log));
219 private ScheduledFuture<?> probeTask = null;
Carmelo Casconee5b28722018-06-22 17:28:28 +0200220
221 private DeviceProviderService providerService;
Andrea Campanella241896c2017-05-10 13:11:04 -0700222 private InternalDeviceListener deviceListener = new InternalDeviceListener();
223
Carmelo Casconee5b28722018-06-22 17:28:28 +0200224 private final ConfigFactory factory =
Andrea Campanella241896c2017-05-10 13:11:04 -0700225 new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
226 SubjectFactories.DEVICE_SUBJECT_FACTORY,
227 GeneralProviderDeviceConfig.class, CFG_SCHEME) {
228 @Override
229 public GeneralProviderDeviceConfig createConfig() {
230 return new GeneralProviderDeviceConfig();
231 }
232 };
233
Carmelo Casconee5b28722018-06-22 17:28:28 +0200234 private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
235 private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
Andrea Campanella241896c2017-05-10 13:11:04 -0700236
237
238 @Activate
Andrea Campanella1e573442018-05-17 17:07:13 +0200239 public void activate(ComponentContext context) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700240 providerService = providerRegistry.register(this);
Andrea Campanella4929a812017-10-09 18:38:23 +0200241 componentConfigService.registerProperties(getClass());
Andrea Campanella241896c2017-05-10 13:11:04 -0700242 coreService.registerApplication(APP_NAME);
243 cfgService.registerConfigFactory(factory);
244 cfgService.addListener(cfgListener);
245 deviceService.addListener(deviceListener);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700246 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700247 //This will fail if ONOS has CFG and drivers which depend on this provider
248 // are activated, failing due to not finding the driver.
249 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200250 .forEach(this::triggerConnect);
Andrea Campanella1e573442018-05-17 17:07:13 +0200251 //Initiating a periodic check to see if any device is available again and reconnect it.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200252 rescheduleProbeTask();
Andrea Campanella1e573442018-05-17 17:07:13 +0200253 modified(context);
Andrea Campanella241896c2017-05-10 13:11:04 -0700254 log.info("Started");
255 }
256
Andrea Campanella19090322017-08-22 10:31:37 +0200257 @Modified
258 public void modified(ComponentContext context) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200259 if (context == null) {
260 return;
Andrea Campanella19090322017-08-22 10:31:37 +0200261 }
262
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200263 Dictionary<?, ?> properties = context.getProperties();
264 final int oldStatsPollFrequency = statsPollFrequency;
265 statsPollFrequency = Tools.getIntegerProperty(
266 properties, STATS_POLL_FREQUENCY, DEFAULT_STATS_POLL_FREQUENCY);
267 log.info("Configured. {} is configured to {} seconds",
268 STATS_POLL_FREQUENCY, statsPollFrequency);
269 final int oldProbeFrequency = probeFrequency;
270 probeFrequency = Tools.getIntegerProperty(
271 properties, PROBE_FREQUENCY, DEFAULT_PROBE_FREQUENCY);
272 log.info("Configured. {} is configured to {} seconds",
273 PROBE_FREQUENCY, probeFrequency);
274 opTimeoutShort = Tools.getIntegerProperty(
275 properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
276 log.info("Configured. {} is configured to {} seconds",
277 OP_TIMEOUT_SHORT, opTimeoutShort);
278 opTimeoutLong = Tools.getIntegerProperty(
279 properties, OP_TIMEOUT_LONG, DEFAULT_OP_TIMEOUT_LONG);
280 log.info("Configured. {} is configured to {} seconds",
281 OP_TIMEOUT_LONG, opTimeoutLong);
282
283 if (oldStatsPollFrequency != statsPollFrequency) {
284 rescheduleStatsPollingTasks();
Andrea Campanella19090322017-08-22 10:31:37 +0200285 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200286
287 if (oldProbeFrequency != probeFrequency) {
288 rescheduleProbeTask();
289 }
290 }
291
292 private synchronized void rescheduleProbeTask() {
293 if (probeTask != null) {
294 probeTask.cancel(false);
295 }
296 probeTask = probeExecutor.scheduleAtFixedRate(
297 this::triggerProbeAllDevices, probeFrequency,
298 probeFrequency, TimeUnit.SECONDS);
Andrea Campanella19090322017-08-22 10:31:37 +0200299 }
300
Andrea Campanella241896c2017-05-10 13:11:04 -0700301 @Deactivate
302 public void deactivate() {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200303 statsExecutor.shutdown();
304 probeExecutor.shutdown();
Andrea Campanella4929a812017-10-09 18:38:23 +0200305 componentConfigService.unregisterProperties(getClass(), false);
Andrea Campanella241896c2017-05-10 13:11:04 -0700306 cfgService.removeListener(cfgListener);
307 //Not Removing the device so they can still be used from other driver providers
308 //cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
309 // .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
310 connectionExecutor.shutdown();
311 deviceService.removeListener(deviceListener);
312 providerRegistry.unregister(this);
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700313 handshakers.clear();
Andrea Campanella241896c2017-05-10 13:11:04 -0700314 providerService = null;
315 cfgService.unregisterConfigFactory(factory);
316 log.info("Stopped");
317 }
318
319 public GeneralDeviceProvider() {
320 super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
321 }
322
323
324 @Override
325 public void triggerProbe(DeviceId deviceId) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200326 connectionExecutor.execute(withDeviceLock(
327 () -> doDeviceProbe(deviceId), deviceId));
Andrea Campanella241896c2017-05-10 13:11:04 -0700328 }
329
330 @Override
331 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200332 log.info("Received role {} for device {}", newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200333 requestedRoles.put(deviceId, newRole);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200334 connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
Carmelo Casconee5b28722018-06-22 17:28:28 +0200335 }
336
337 private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200338 final DeviceHandshaker handshaker = getHandshaker(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200339 if (handshaker == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200340 log.error("Null handshaker. Unable to notify new role {} to {}",
341 newRole, deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200342 return;
343 }
344 handshaker.roleChanged(newRole);
Andrea Campanella241896c2017-05-10 13:11:04 -0700345 }
346
347 @Override
348 public boolean isReachable(DeviceId deviceId) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200349 log.debug("Testing reachability for device {}", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200350 final DeviceHandshaker handshaker = getHandshaker(deviceId);
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100351 if (handshaker == null) {
352 return false;
353 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200354 return getFutureWithDeadline(
355 handshaker.isReachable(), "checking reachability",
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200356 deviceId, false, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700357 }
358
359 @Override
360 public void changePortState(DeviceId deviceId, PortNumber portNumber,
361 boolean enable) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200362 connectionExecutor.execute(
363 () -> doChangePortState(deviceId, portNumber, enable));
364 }
365
366 private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
367 boolean enable) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200368 if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
369 log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
370 deviceId);
371 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700372 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200373 final PortAdmin portAdmin = deviceService.getDevice(deviceId)
374 .as(PortAdmin.class);
375 final CompletableFuture<Boolean> modifyTask = enable
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200376 ? portAdmin.enable(portNumber)
377 : portAdmin.disable(portNumber);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200378 final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
379 getFutureWithDeadline(
380 modifyTask, descr, deviceId, null, opTimeoutShort);
Andrea Campanella241896c2017-05-10 13:11:04 -0700381 }
382
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700383 @Override
384 public void triggerDisconnect(DeviceId deviceId) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200385 log.debug("Triggering disconnection of device {}", deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200386 connectionExecutor.execute(withDeviceLock(
387 () -> doDisconnectDevice(deviceId), deviceId));
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700388 }
389
Andrea Campanella241896c2017-05-10 13:11:04 -0700390 private DeviceHandshaker getHandshaker(DeviceId deviceId) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700391 return handshakers.computeIfAbsent(deviceId, id -> {
392 Driver driver = getDriver(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200393 return driver == null ? null : getBehaviour(
394 driver, DeviceHandshaker.class,
395 new DefaultDriverData(driver, deviceId));
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700396 });
Andrea Campanella241896c2017-05-10 13:11:04 -0700397 }
398
Andrea Campanella241896c2017-05-10 13:11:04 -0700399 private Driver getDriver(DeviceId deviceId) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700400 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200401 // DriverManager checks first using basic device config.
402 return driverService.getDriver(deviceId);
Andrea Campanella241896c2017-05-10 13:11:04 -0700403 } catch (ItemNotFoundException e) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200404 log.error("Driver not found for {}", deviceId);
405 return null;
Andrea Campanella241896c2017-05-10 13:11:04 -0700406 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700407 }
408
Andrea Campanella241896c2017-05-10 13:11:04 -0700409 private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
410 DriverData data) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200411 // Allows obtaining behavior implementations before the device is pushed
412 // to the core.
Andrea Campanellac1ecdd02018-01-12 12:48:24 +0100413 if (driver != null && driver.hasBehaviour(type)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700414 DefaultDriverHandler handler = new DefaultDriverHandler(data);
415 return driver.createBehaviour(handler, type);
416 } else {
417 return null;
418 }
419 }
420
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200421 private void doConnectDevice(DeviceId deviceId) {
422 // Some operations can be performed by one node only.
423 final boolean isLocalLeader = leadershipService.runForLeadership(
424 GeneralProviderDeviceConfig.class.getName() + deviceId)
425 .leader().nodeId().equals(clusterService.getLocalNode().id());
426
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200427 if (deviceService.getDevice(deviceId) != null
428 && deviceService.isAvailable(deviceId)) {
429 log.info("Device {} is already connected to ONOS and is available",
430 deviceId);
431 return;
432 }
433 // Retrieve config
434 final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
435 deviceId, GeneralProviderDeviceConfig.class);
436 final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
437 deviceId, BasicDeviceConfig.class);
Andrea Campanella241896c2017-05-10 13:11:04 -0700438 if (providerConfig == null || basicDeviceConfig == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200439 log.error("Configuration missing, cannot connect to {}. " +
440 "basicDeviceConfig={}, generalProvider={}",
441 deviceId, basicDeviceConfig, providerConfig);
442 return;
443 }
444 log.info("Initiating connection to device {} with driver {} ... asMaster={}",
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200445 deviceId, basicDeviceConfig.driver(), isLocalLeader);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200446 // Get handshaker, driver and driver data.
447 final DeviceHandshaker handshaker = getHandshaker(deviceId);
448 if (handshaker == null) {
449 log.error("Missing DeviceHandshaker behavior for {}, aborting connection",
450 deviceId);
451 return;
452 }
453 final Driver driver = handshaker.handler().driver();
454 // Enhance driver data with info in GDP config.
455 augmentConfigData(providerConfig, handshaker.data());
456 final DriverData driverData = handshaker.data();
457 // Start connection via handshaker.
458 final Boolean connectSuccess = getFutureWithDeadline(
459 handshaker.connect(), "initiating connection",
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200460 deviceId, null, opTimeoutShort);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200461 if (connectSuccess == null) {
462 // Error logged by getFutureWithDeadline().
463 return;
464 } else if (!connectSuccess) {
465 log.warn("Unable to connect to {}", deviceId);
466 return;
467 }
468 // Handle pipeconf (if device is capable)
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200469 if (!handlePipeconf(deviceId, driver, driverData, isLocalLeader)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200470 // We already logged the error.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200471 getFutureWithDeadline(
472 handshaker.disconnect(), "performing disconnection",
473 deviceId, null, opTimeoutShort);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200474 return;
475 }
476 // Add device agent listener.
477 handshaker.addDeviceAgentListener(deviceAgentListener);
478 // All good. Notify core (if master).
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200479 if (isLocalLeader) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200480 advertiseDevice(deviceId, driver, providerConfig, driverData);
Andrea Campanella241896c2017-05-10 13:11:04 -0700481 }
482 }
483
Andrea Campanella14e196d2017-07-24 18:11:36 +0200484
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200485 private void advertiseDevice(DeviceId deviceId, Driver driver,
486 GeneralProviderDeviceConfig providerConfig,
487 DriverData driverData) {
488 // Obtain device and port description and advertise device to core.
489 DeviceDescription description = null;
490 final List<PortDescription> ports;
491
492 final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
493 driver, DeviceDescriptionDiscovery.class, driverData);
494
495 if (deviceDiscovery != null) {
496 description = deviceDiscovery.discoverDeviceDetails();
497 ports = deviceDiscovery.discoverPortDetails();
498 } else {
499 log.warn("Missing DeviceDescriptionDiscovery behavior for {}, " +
500 "no update for description or ports.", deviceId);
501 ports = new ArrayList<>();
Andrea Campanella14e196d2017-07-24 18:11:36 +0200502 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200503
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200504 if (description == null) {
505 // Generate one here.
506 // FIXME: a behavior impl should not return a null description
507 // (e.g. as GnmiDeviceDescriptionDiscovery). This case should apply
508 // only if a the behavior is not available.
509 description = new DefaultDeviceDescription(
510 deviceId.uri(), Device.Type.SWITCH,
511 driver.manufacturer(), driver.hwVersion(),
512 driver.swVersion(), UNKNOWN,
513 new ChassisId(), true,
514 DefaultAnnotations.builder()
515 .set(AnnotationKeys.PROTOCOL,
516 providerConfig.protocolsInfo().keySet().toString())
517 .build());
518 }
519
520 providerService.deviceConnected(deviceId, description);
521 providerService.updatePorts(deviceId, ports);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200522 }
523
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400524 /**
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200525 * Handles the case of a device that is pipeline programmable. Returns true
526 * if the operation wa successful and the device can be registered to the
527 * core, false otherwise.
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400528 */
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200529 private boolean handlePipeconf(DeviceId deviceId, Driver driver,
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200530 DriverData driverData, boolean asMaster) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200531 final PiPipelineProgrammable pipelineProg = getBehaviour(
532 driver, PiPipelineProgrammable.class, driverData);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400533 if (pipelineProg == null) {
534 // Device is not pipeline programmable.
535 return true;
536 }
537
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200538 final PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
539 if (pipeconf == null) {
540 return false;
541 }
542 final PiPipeconfId pipeconfId = pipeconf.id();
Andrea Campanella14e196d2017-07-24 18:11:36 +0200543
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200544 final String mergedDriverName = piPipeconfService.mergeDriver(
545 deviceId, pipeconfId);
546 if (mergedDriverName == null) {
547 log.error("Unable to get merged driver for {} and {}, aborting device discovery",
548 deviceId, pipeconfId);
549 return false;
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200550 }
551
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200552 if (!asMaster) {
553 // From now one only the master.
554 return true;
555 }
556
557 if (!setDriverViaCfg(deviceId, mergedDriverName)) {
558 return false;
559 }
560
561 // FIXME: we just introduced a race condition as it might happen that a
562 // node does not receive the new cfg (with the merged driver) before the
563 // device is advertised to the core. Perhaps we should be waiting for a
564 // NetworkConfig event signaling that the driver has been updated on all
565 // nodes? The effect is mitigated by deploying the pipeconf (slow
566 // operation), after calling setDriverViaCfg().
567
568 piPipeconfService.bindToDevice(pipeconfId, deviceId);
569
570 final Boolean deploySuccess = getFutureWithDeadline(
571 pipelineProg.deployPipeconf(pipeconf),
572 "deploying pipeconf", deviceId, null,
573 opTimeoutLong);
574 if (deploySuccess == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200575 // Error logged by getFutureWithDeadline().
576 return false;
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200577 } else if (!deploySuccess) {
578 log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200579 pipeconfId, deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200580 return false;
581 }
582
583 return true;
584 }
585
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200586 private boolean setDriverViaCfg(DeviceId deviceId, String driverName) {
587 BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
588 if (cfg == null) {
589 log.error("Unable to get basic device config for {}, aborting device discovery",
590 deviceId);
591 return false;
592 }
593 ObjectNode newCfg = (ObjectNode) cfg.node();
594 newCfg = newCfg.put(DRIVER, driverName);
595 ObjectMapper mapper = new ObjectMapper();
596 JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
597 cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
598 return true;
599 }
600
Andrea Campanella14e196d2017-07-24 18:11:36 +0200601 private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
Carmelo Cascone92044522018-06-29 19:00:59 +0200602 PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
603 if (pipeconfId == null || pipeconfId.id().isEmpty()) {
604 // No pipeconf has been provided in the cfg.
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400605 // Check if device driver provides a default one.
606 if (pipelineProg.getDefaultPipeconf().isPresent()) {
Carmelo Cascone92044522018-06-29 19:00:59 +0200607 final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400608 log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
Carmelo Cascone92044522018-06-29 19:00:59 +0200609 pipeconfId = defaultPipeconf.id();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400610 } else {
Carmelo Cascone92044522018-06-29 19:00:59 +0200611 log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it", deviceId);
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400612 return null;
613 }
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200614 }
Carmelo Cascone92044522018-06-29 19:00:59 +0200615 // Check if registered
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200616 if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
617 log.warn("Pipeconf {} is not registered", pipeconfId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200618 return null;
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400619 }
Andrea Campanella14e196d2017-07-24 18:11:36 +0200620 return piPipeconfService.getPipeconf(pipeconfId).get();
Carmelo Cascone59f57de2017-07-11 19:55:09 -0400621 }
622
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200623 private void doDisconnectDevice(DeviceId deviceId) {
624 log.info("Initiating disconnection from {}...", deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200625 // Remove from core (if master)
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200626 if (mastershipService.isLocalMaster(deviceId)
627 && deviceService.isAvailable(deviceId)) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200628 providerService.deviceDisconnected(deviceId);
629 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200630 cancelStatsPolling(deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200631 // Perform disconnection with device.
632 final DeviceHandshaker handshaker = handshakers.remove(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200633 if (handshaker == null) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200634 // Gracefully ignore
635 log.warn("Missing DeviceHandshaker behavior for {}, " +
636 "no guarantees of complete disconnection",
637 deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200638 return;
Andrea Campanella241896c2017-05-10 13:11:04 -0700639 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200640 handshaker.removeDeviceAgentListener(deviceAgentListener);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200641 final boolean disconnectSuccess = getFutureWithDeadline(
642 handshaker.disconnect(), "performing disconnection",
643 deviceId, false, opTimeoutShort);
644 if (!disconnectSuccess) {
645 log.warn("Unable to disconnect from {}", deviceId);
646 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700647 }
648
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200649 // Needed to catch the exception in the executors since are not rethrown otherwise.
Andrea Campanella241896c2017-05-10 13:11:04 -0700650 private Runnable exceptionSafe(Runnable runnable) {
651 return () -> {
652 try {
653 runnable.run();
654 } catch (Exception e) {
655 log.error("Unhandled Exception", e);
656 }
657 };
658 }
659
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200660 private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
661 final Lock lock = deviceLocks.get(deviceId);
662 lock.lock();
663 try {
664 return task.get();
665 } finally {
666 lock.unlock();
667 }
668 }
669
670 private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
671 // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
672 return () -> withDeviceLock(() -> {
673 task.run();
674 return null;
675 }, deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200676 }
677
Andrea Campanella241896c2017-05-10 13:11:04 -0700678 private void updatePortStatistics(DeviceId deviceId) {
Andrea Campanellace111932017-09-18 16:59:56 +0900679 Device device = deviceService.getDevice(deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200680 if (device != null && deviceService.isAvailable(deviceId) &&
Andrea Campanellace111932017-09-18 16:59:56 +0900681 device.is(PortStatisticsDiscovery.class)) {
682 Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
683 .discoverPortStatistics();
684 //updating statistcs only if not empty
685 if (!statistics.isEmpty()) {
686 providerService.updatePortStatistics(deviceId, statistics);
687 }
688 } else {
689 log.debug("Can't update port statistics for device {}", deviceId);
Andrea Campanella19090322017-08-22 10:31:37 +0200690 }
691 }
692
Carmelo Casconee5b28722018-06-22 17:28:28 +0200693 private boolean notMyScheme(DeviceId deviceId) {
694 return !deviceId.uri().getScheme().equals(URI_SCHEME);
Andrea Campanella241896c2017-05-10 13:11:04 -0700695 }
696
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200697 private void triggerConnect(DeviceId deviceId) {
698 connectionExecutor.execute(withDeviceLock(
699 () -> doConnectDevice(deviceId), deviceId));
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200700 }
701
Andrea Campanella241896c2017-05-10 13:11:04 -0700702 /**
703 * Listener for configuration events.
704 */
705 private class InternalNetworkConfigListener implements NetworkConfigListener {
706
Andrea Campanella241896c2017-05-10 13:11:04 -0700707 @Override
708 public void event(NetworkConfigEvent event) {
709 DeviceId deviceId = (DeviceId) event.subject();
710 //Assuming that the deviceId comes with uri 'device:'
Carmelo Casconee5b28722018-06-22 17:28:28 +0200711 if (notMyScheme(deviceId)) {
Andrea Campanella241896c2017-05-10 13:11:04 -0700712 // not under my scheme, skipping
713 log.debug("{} is not my scheme, skipping", deviceId);
714 return;
715 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200716 final boolean configComplete = withDeviceLock(
717 () -> isDeviceConfigComplete(event, deviceId), deviceId);
718 if (!configComplete) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200719 // Still waiting for some configuration.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200720 return;
721 }
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200722 // Good to go.
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200723 triggerConnect(deviceId);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200724 cleanUpConfigInfo(deviceId);
Andrea Campanella14e196d2017-07-24 18:11:36 +0200725 }
726
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200727 private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
728 // FIXME to be removed when netcfg will issue device events in a bundle or
Andrea Campanellabc112a92017-06-26 19:06:43 +0200729 // ensure all configuration needed is present
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200730 if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
731 //FIXME we currently assume that p4runtime devices are pipeline configurable.
732 //If we want to connect a p4runtime device with no pipeline
733 if (event.config().isPresent()) {
734 deviceConfigured.add(deviceId);
735 final boolean isNotPipelineConfigurable = Collections.disjoint(
736 ImmutableSet.copyOf(event.config().get().node().fieldNames()),
737 PIPELINE_CONFIGURABLE_PROTOCOLS);
738 if (isNotPipelineConfigurable) {
739 // Skip waiting for a pipeline if we can't support it.
Andrea Campanellabc112a92017-06-26 19:06:43 +0200740 pipelineConfigured.add(deviceId);
741 }
742 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200743 } else if (event.configClass().equals(BasicDeviceConfig.class)) {
744 if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
745 driverConfigured.add(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200746 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200747 } else if (event.configClass().equals(PiPipeconfConfig.class)) {
748 if (event.config().isPresent()
749 && event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
750 pipelineConfigured.add(deviceId);
751 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700752 }
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200753
754 if (deviceConfigured.contains(deviceId)
755 && driverConfigured.contains(deviceId)
756 && pipelineConfigured.contains(deviceId)) {
757 return true;
758 } else {
759 if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
760 log.debug("Waiting for pipeline configuration for device {}", deviceId);
761 } else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
762 log.debug("Waiting for device configuration for device {}", deviceId);
763 } else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
764 log.debug("Waiting for driver configuration for device {}", deviceId);
765 } else if (driverConfigured.contains(deviceId)) {
766 log.debug("Only driver configuration for device {}", deviceId);
767 } else if (deviceConfigured.contains(deviceId)) {
768 log.debug("Only device configuration for device {}", deviceId);
769 }
770 }
771 return false;
Andrea Campanella241896c2017-05-10 13:11:04 -0700772 }
773
774 @Override
775 public boolean isRelevant(NetworkConfigEvent event) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200776 return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
777 event.configClass().equals(BasicDeviceConfig.class) ||
778 event.configClass().equals(PiPipeconfConfig.class)) &&
Andrea Campanella241896c2017-05-10 13:11:04 -0700779 (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
780 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
781 }
782 }
783
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200784 private void augmentConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
Andrea Campanella14e196d2017-07-24 18:11:36 +0200785 //Storing deviceKeyId and all other config values
786 // as data in the driver with protocol_<info>
787 // name as the key. e.g protocol_ip
788 providerConfig.protocolsInfo()
789 .forEach((protocol, deviceInfoConfig) -> {
790 deviceInfoConfig.configValues()
791 .forEach((k, v) -> driverData.set(protocol + "_" + k, v));
792 driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
793 });
794 }
795
796 private void cleanUpConfigInfo(DeviceId deviceId) {
Andrea Campanellabc112a92017-06-26 19:06:43 +0200797 deviceConfigured.remove(deviceId);
798 driverConfigured.remove(deviceId);
799 pipelineConfigured.remove(deviceId);
Andrea Campanellabc112a92017-06-26 19:06:43 +0200800 }
801
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200802 private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
803 statsPollingTasks.compute(deviceId, (did, oldTask) -> {
804 if (oldTask != null) {
805 oldTask.cancel(false);
806 }
807 final int delay = withRandomDelay
808 ? new SecureRandom().nextInt(10) : 0;
809 return statsExecutor.scheduleAtFixedRate(
810 exceptionSafe(() -> updatePortStatistics(deviceId)),
811 delay, statsPollFrequency, TimeUnit.SECONDS);
812 });
Andrea Campanella19090322017-08-22 10:31:37 +0200813 }
814
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200815 private void cancelStatsPolling(DeviceId deviceId) {
816 statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
817 task.cancel(false);
818 return null;
819 });
820 }
821
822 private void rescheduleStatsPollingTasks() {
823 statsPollingTasks.keySet().forEach(deviceId -> {
824 // startStatsPolling cancels old one if present.
825 startStatsPolling(deviceId, true);
826 });
827 }
828
829 private void triggerProbeAllDevices() {
830 // Async trigger a task for all devices in the cfg.
831 cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
832 .forEach(deviceId -> connectionExecutor.execute(withDeviceLock(
833 () -> doDeviceProbe(deviceId), deviceId)));
Andrea Campanella1e573442018-05-17 17:07:13 +0200834 }
835
Carmelo Cascone92044522018-06-29 19:00:59 +0200836 private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
837 PiPipeconfConfig config = cfgService.getConfig(
838 deviceId, PiPipeconfConfig.class);
839 if (config == null) {
840 return null;
841 }
842 return config.piPipeconfId();
843 }
844
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200845 private void doDeviceProbe(DeviceId deviceId) {
846 if (!configIsPresent(deviceId)) {
847 return;
848 }
849 final boolean isAvailable = deviceService.getDevice(deviceId) != null
850 && deviceService.isAvailable(deviceId);
851 final boolean isLocalMaster = mastershipService.isLocalMaster(deviceId);
852 if (isAvailable) {
853 if (!isLocalMaster) {
854 return;
855 }
856 if (!isReachable(deviceId)) {
857 log.info("Disconnecting available but unreachable device {}...",
858 deviceId);
859 triggerDisconnect(deviceId);
860 }
861 } else {
862 // We do not check for reachability using isReachable()
863 // since the behaviour of this method can vary depending on protocol
864 // nuances. We leave this check to the device handshaker at later
865 // stages of the connection process.
866 triggerConnect(deviceId);
Andrea Campanella1e573442018-05-17 17:07:13 +0200867 }
868 }
869
870 private boolean configIsPresent(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200871 final boolean present =
872 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
873 && cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
Andrea Campanella1e573442018-05-17 17:07:13 +0200874 if (!present) {
875 log.warn("Configuration for device {} is not complete", deviceId);
876 }
877 return present;
878 }
879
Carmelo Casconee5b28722018-06-22 17:28:28 +0200880 private void handleChannelClosed(DeviceId deviceId) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200881 log.info("Disconnecting device {}, due to channel closed event",
882 deviceId);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200883 triggerDisconnect(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200884 }
885
886 private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
887 //Notify core about response.
888 if (!requestedRoles.containsKey(deviceId)) {
889 return;
890 }
891 providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200892 if (response.equals(MastershipRole.MASTER)) {
893 startStatsPolling(deviceId, false);
894 } else {
895 cancelStatsPolling(deviceId);
Carmelo Casconee5b28722018-06-22 17:28:28 +0200896 }
897 }
898
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200899 private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200900 DeviceId deviceId, U defaultValue, int timeout) {
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200901 try {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200902 return future.get(timeout, TimeUnit.SECONDS);
Carmelo Cascone96beb6f2018-06-27 18:07:12 +0200903 } catch (InterruptedException e) {
904 log.error("Thread interrupted while {} on {}", opDescription, deviceId);
905 Thread.currentThread().interrupt();
906 } catch (ExecutionException e) {
907 log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
908 } catch (TimeoutException e) {
909 log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
910 }
911 return defaultValue;
912 }
913
Andrea Campanella241896c2017-05-10 13:11:04 -0700914 /**
915 * Listener for core device events.
916 */
917 private class InternalDeviceListener implements DeviceListener {
918 @Override
919 public void event(DeviceEvent event) {
Andrea Campanellace111932017-09-18 16:59:56 +0900920 DeviceId deviceId = event.subject().id();
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700921 // For now this is scheduled periodically, when streaming API will
922 // be available we check and base it on the streaming API (e.g. gNMI)
923 if (mastershipService.isLocalMaster(deviceId)) {
Carmelo Cascone158b8c42018-07-04 19:42:37 +0200924 startStatsPolling(deviceId, true);
Andrea Campanella241896c2017-05-10 13:11:04 -0700925 }
926 }
927
928 @Override
929 public boolean isRelevant(DeviceEvent event) {
Thomas Vachuska5b38dc02018-05-10 15:24:40 -0700930 return event.type() == Type.DEVICE_ADDED &&
931 event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
Andrea Campanella241896c2017-05-10 13:11:04 -0700932 }
933 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200934
935 /**
Carmelo Casconee5b28722018-06-22 17:28:28 +0200936 * Listener for device agent events.
Andrea Campanella1e573442018-05-17 17:07:13 +0200937 */
Carmelo Casconee5b28722018-06-22 17:28:28 +0200938 private class InternalDeviceAgentListener implements DeviceAgentListener {
Andrea Campanella1e573442018-05-17 17:07:13 +0200939
940 @Override
Carmelo Casconee5b28722018-06-22 17:28:28 +0200941 public void event(DeviceAgentEvent event) {
Andrea Campanella1e573442018-05-17 17:07:13 +0200942 DeviceId deviceId = event.subject();
Carmelo Casconee5b28722018-06-22 17:28:28 +0200943 switch (event.type()) {
944 case CHANNEL_OPEN:
945 // Ignore.
946 break;
947 case CHANNEL_CLOSED:
948 handleChannelClosed(deviceId);
949 break;
950 case CHANNEL_ERROR:
951 // TODO evaluate other reaction to channel error.
952 log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
953 deviceId);
954 break;
955 case ROLE_MASTER:
956 handleMastershipResponse(deviceId, MastershipRole.MASTER);
957 break;
958 case ROLE_STANDBY:
959 handleMastershipResponse(deviceId, MastershipRole.STANDBY);
960 break;
961 case ROLE_NONE:
962 handleMastershipResponse(deviceId, MastershipRole.NONE);
963 break;
964 default:
965 log.warn("Unrecognized device agent event {}", event.type());
Andrea Campanella1e573442018-05-17 17:07:13 +0200966 }
Andrea Campanella1e573442018-05-17 17:07:13 +0200967 }
968
969 }
Andrea Campanella241896c2017-05-10 13:11:04 -0700970}