blob: 21c6ae719129459128c11f3d4a44e9dbc71137fb [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.provider.general.device.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.ChassisId;
import org.onlab.util.ItemNotFoundException;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.PiPipelineProgrammable;
import org.onosproject.net.behaviour.PortAdmin;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.BasicDeviceConfig;
import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceAgentEvent;
import org.onosproject.net.device.DeviceAgentListener;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceHandshaker;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.DefaultDriverData;
import org.onosproject.net.driver.DefaultDriverHandler;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.service.PiPipeconfConfig;
import org.onosproject.net.pi.service.PiPipeconfService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.provider.general.device.api.GeneralProviderDeviceConfig;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.device.DeviceEvent.Type;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provider which uses drivers to detect device and do initial handshake and
* channel establishment with devices. Any other provider specific operation is
* also delegated to the DeviceHandshaker driver.
*/
@Beta
@Component(immediate = true)
public class GeneralDeviceProvider extends AbstractProvider
implements DeviceProvider {
private static final String DRIVER = "driver";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DeviceProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private NetworkConfigRegistry cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private PiPipeconfService piPipeconfService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private LeadershipService leadershipService;
private static final String STATS_POLL_FREQUENCY = "deviceStatsPollFrequency";
private static final int DEFAULT_STATS_POLL_FREQUENCY = 10;
@Property(name = STATS_POLL_FREQUENCY, intValue = DEFAULT_STATS_POLL_FREQUENCY,
label = "Configure poll frequency for port status and statistics; " +
"default is 10 sec")
private int statsPollFrequency = DEFAULT_STATS_POLL_FREQUENCY;
private static final String PROBE_FREQUENCY = "deviceProbeFrequency";
private static final int DEFAULT_PROBE_FREQUENCY = 10;
@Property(name = PROBE_FREQUENCY, intValue = DEFAULT_PROBE_FREQUENCY,
label = "Configure probe frequency for checking device availability; " +
"default is 10 sec")
private int probeFrequency = DEFAULT_PROBE_FREQUENCY;
private static final String OP_TIMEOUT_SHORT = "deviceOperationTimeoutShort";
private static final int DEFAULT_OP_TIMEOUT_SHORT = 10;
@Property(name = OP_TIMEOUT_SHORT, intValue = DEFAULT_OP_TIMEOUT_SHORT,
label = "Configure timeout in seconds for device operations " +
"that are supposed to take a short time " +
"(e.g. checking device reachability); default is 10 seconds")
private int opTimeoutShort = DEFAULT_OP_TIMEOUT_SHORT;
private static final String OP_TIMEOUT_LONG = "deviceOperationTimeoutLong";
private static final int DEFAULT_OP_TIMEOUT_LONG = 60;
@Property(name = OP_TIMEOUT_LONG, intValue = DEFAULT_OP_TIMEOUT_LONG,
label = "Configure timeout in seconds for device operations " +
"that are supposed to take a relatively long time " +
"(e.g. pushing a large pipeline configuration with slow " +
"network); default is 60 seconds")
private int opTimeoutLong = DEFAULT_OP_TIMEOUT_LONG;
private static final String APP_NAME = "org.onosproject.generaldeviceprovider";
private static final String URI_SCHEME = "device";
private static final String CFG_SCHEME = "generalprovider";
private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.general.provider.device";
private static final int CORE_POOL_SIZE = 10;
private static final String UNKNOWN = "unknown";
//FIXME this will be removed when the configuration is synced at the source.
private static final Set<String> PIPELINE_CONFIGURABLE_PROTOCOLS = ImmutableSet.of("p4runtime");
//FIXME to be removed when netcfg will issue device events in a bundle or
//ensures all configuration needed is present
private Set<DeviceId> deviceConfigured = new CopyOnWriteArraySet<>();
private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
private final Map<DeviceId, MastershipRole> requestedRoles = Maps.newConcurrentMap();
private final Striped<Lock> deviceLocks = Striped.lock(30);
private ExecutorService connectionExecutor
= newFixedThreadPool(CORE_POOL_SIZE, groupedThreads(
"onos/generaldeviceprovider-device-connect", "%d", log));
private ScheduledExecutorService statsExecutor
= newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
"onos/generaldeviceprovider-stats-poll", "%d", log));
private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks = new ConcurrentHashMap<>();
private ScheduledExecutorService probeExecutor
= newSingleThreadScheduledExecutor(groupedThreads(
"onos/generaldeviceprovider-probe-", "%d", log));
private ScheduledFuture<?> probeTask = null;
private DeviceProviderService providerService;
private InternalDeviceListener deviceListener = new InternalDeviceListener();
private final ConfigFactory factory =
new ConfigFactory<DeviceId, GeneralProviderDeviceConfig>(
SubjectFactories.DEVICE_SUBJECT_FACTORY,
GeneralProviderDeviceConfig.class, CFG_SCHEME) {
@Override
public GeneralProviderDeviceConfig createConfig() {
return new GeneralProviderDeviceConfig();
}
};
private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
private final DeviceAgentListener deviceAgentListener = new InternalDeviceAgentListener();
@Activate
public void activate(ComponentContext context) {
providerService = providerRegistry.register(this);
componentConfigService.registerProperties(getClass());
coreService.registerApplication(APP_NAME);
cfgService.registerConfigFactory(factory);
cfgService.addListener(cfgListener);
deviceService.addListener(deviceListener);
handshakers.clear();
//This will fail if ONOS has CFG and drivers which depend on this provider
// are activated, failing due to not finding the driver.
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
.forEach(this::triggerConnect);
//Initiating a periodic check to see if any device is available again and reconnect it.
rescheduleProbeTask();
modified(context);
log.info("Started");
}
@Modified
public void modified(ComponentContext context) {
if (context == null) {
return;
}
Dictionary<?, ?> properties = context.getProperties();
final int oldStatsPollFrequency = statsPollFrequency;
statsPollFrequency = Tools.getIntegerProperty(
properties, STATS_POLL_FREQUENCY, DEFAULT_STATS_POLL_FREQUENCY);
log.info("Configured. {} is configured to {} seconds",
STATS_POLL_FREQUENCY, statsPollFrequency);
final int oldProbeFrequency = probeFrequency;
probeFrequency = Tools.getIntegerProperty(
properties, PROBE_FREQUENCY, DEFAULT_PROBE_FREQUENCY);
log.info("Configured. {} is configured to {} seconds",
PROBE_FREQUENCY, probeFrequency);
opTimeoutShort = Tools.getIntegerProperty(
properties, OP_TIMEOUT_SHORT, DEFAULT_OP_TIMEOUT_SHORT);
log.info("Configured. {} is configured to {} seconds",
OP_TIMEOUT_SHORT, opTimeoutShort);
opTimeoutLong = Tools.getIntegerProperty(
properties, OP_TIMEOUT_LONG, DEFAULT_OP_TIMEOUT_LONG);
log.info("Configured. {} is configured to {} seconds",
OP_TIMEOUT_LONG, opTimeoutLong);
if (oldStatsPollFrequency != statsPollFrequency) {
rescheduleStatsPollingTasks();
}
if (oldProbeFrequency != probeFrequency) {
rescheduleProbeTask();
}
}
private synchronized void rescheduleProbeTask() {
if (probeTask != null) {
probeTask.cancel(false);
}
probeTask = probeExecutor.scheduleAtFixedRate(
this::triggerProbeAllDevices, probeFrequency,
probeFrequency, TimeUnit.SECONDS);
}
@Deactivate
public void deactivate() {
statsExecutor.shutdown();
probeExecutor.shutdown();
componentConfigService.unregisterProperties(getClass(), false);
cfgService.removeListener(cfgListener);
//Not Removing the device so they can still be used from other driver providers
//cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
// .forEach(did -> connectionExecutor.execute(() -> disconnectDevice(did)));
connectionExecutor.shutdown();
deviceService.removeListener(deviceListener);
providerRegistry.unregister(this);
handshakers.clear();
providerService = null;
cfgService.unregisterConfigFactory(factory);
log.info("Stopped");
}
public GeneralDeviceProvider() {
super(new ProviderId(URI_SCHEME, DEVICE_PROVIDER_PACKAGE));
}
@Override
public void triggerProbe(DeviceId deviceId) {
connectionExecutor.execute(withDeviceLock(
() -> doDeviceProbe(deviceId), deviceId));
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
log.info("Received role {} for device {}", newRole, deviceId);
requestedRoles.put(deviceId, newRole);
connectionExecutor.execute(() -> doRoleChanged(deviceId, newRole));
}
private void doRoleChanged(DeviceId deviceId, MastershipRole newRole) {
final DeviceHandshaker handshaker = getHandshaker(deviceId);
if (handshaker == null) {
log.error("Null handshaker. Unable to notify new role {} to {}",
newRole, deviceId);
return;
}
handshaker.roleChanged(newRole);
}
@Override
public boolean isReachable(DeviceId deviceId) {
log.debug("Testing reachability for device {}", deviceId);
final DeviceHandshaker handshaker = getHandshaker(deviceId);
if (handshaker == null) {
return false;
}
return getFutureWithDeadline(
handshaker.isReachable(), "checking reachability",
deviceId, false, opTimeoutShort);
}
@Override
public void changePortState(DeviceId deviceId, PortNumber portNumber,
boolean enable) {
connectionExecutor.execute(
() -> doChangePortState(deviceId, portNumber, enable));
}
private void doChangePortState(DeviceId deviceId, PortNumber portNumber,
boolean enable) {
if (!deviceService.getDevice(deviceId).is(PortAdmin.class)) {
log.warn("Missing PortAdmin behaviour on {}, aborting port state change",
deviceId);
return;
}
final PortAdmin portAdmin = deviceService.getDevice(deviceId)
.as(PortAdmin.class);
final CompletableFuture<Boolean> modifyTask = enable
? portAdmin.enable(portNumber)
: portAdmin.disable(portNumber);
final String descr = (enable ? "enabling" : "disabling") + " port " + portNumber;
getFutureWithDeadline(
modifyTask, descr, deviceId, null, opTimeoutShort);
}
@Override
public void triggerDisconnect(DeviceId deviceId) {
log.debug("Triggering disconnection of device {}", deviceId);
connectionExecutor.execute(withDeviceLock(
() -> doDisconnectDevice(deviceId), deviceId));
}
private DeviceHandshaker getHandshaker(DeviceId deviceId) {
return handshakers.computeIfAbsent(deviceId, id -> {
Driver driver = getDriver(deviceId);
return driver == null ? null : getBehaviour(
driver, DeviceHandshaker.class,
new DefaultDriverData(driver, deviceId));
});
}
private Driver getDriver(DeviceId deviceId) {
try {
// DriverManager checks first using basic device config.
return driverService.getDriver(deviceId);
} catch (ItemNotFoundException e) {
log.error("Driver not found for {}", deviceId);
return null;
}
}
private <T extends Behaviour> T getBehaviour(Driver driver, Class<T> type,
DriverData data) {
// Allows obtaining behavior implementations before the device is pushed
// to the core.
if (driver != null && driver.hasBehaviour(type)) {
DefaultDriverHandler handler = new DefaultDriverHandler(data);
return driver.createBehaviour(handler, type);
} else {
return null;
}
}
private void doConnectDevice(DeviceId deviceId) {
// Some operations can be performed by one node only.
final boolean isLocalLeader = leadershipService.runForLeadership(
GeneralProviderDeviceConfig.class.getName() + deviceId)
.leader().nodeId().equals(clusterService.getLocalNode().id());
if (deviceService.getDevice(deviceId) != null
&& deviceService.isAvailable(deviceId)) {
log.info("Device {} is already connected to ONOS and is available",
deviceId);
return;
}
// Retrieve config
final GeneralProviderDeviceConfig providerConfig = cfgService.getConfig(
deviceId, GeneralProviderDeviceConfig.class);
final BasicDeviceConfig basicDeviceConfig = cfgService.getConfig(
deviceId, BasicDeviceConfig.class);
if (providerConfig == null || basicDeviceConfig == null) {
log.error("Configuration missing, cannot connect to {}. " +
"basicDeviceConfig={}, generalProvider={}",
deviceId, basicDeviceConfig, providerConfig);
return;
}
log.info("Initiating connection to device {} with driver {} ... asMaster={}",
deviceId, basicDeviceConfig.driver(), isLocalLeader);
// Get handshaker, driver and driver data.
final DeviceHandshaker handshaker = getHandshaker(deviceId);
if (handshaker == null) {
log.error("Missing DeviceHandshaker behavior for {}, aborting connection",
deviceId);
return;
}
final Driver driver = handshaker.handler().driver();
// Enhance driver data with info in GDP config.
augmentConfigData(providerConfig, handshaker.data());
final DriverData driverData = handshaker.data();
// Start connection via handshaker.
final Boolean connectSuccess = getFutureWithDeadline(
handshaker.connect(), "initiating connection",
deviceId, null, opTimeoutShort);
if (connectSuccess == null) {
// Error logged by getFutureWithDeadline().
return;
} else if (!connectSuccess) {
log.warn("Unable to connect to {}", deviceId);
return;
}
// Handle pipeconf (if device is capable)
if (!handlePipeconf(deviceId, driver, driverData, isLocalLeader)) {
// We already logged the error.
getFutureWithDeadline(
handshaker.disconnect(), "performing disconnection",
deviceId, null, opTimeoutShort);
return;
}
// Add device agent listener.
handshaker.addDeviceAgentListener(deviceAgentListener);
// All good. Notify core (if master).
if (isLocalLeader) {
advertiseDevice(deviceId, driver, providerConfig, driverData);
}
}
private void advertiseDevice(DeviceId deviceId, Driver driver,
GeneralProviderDeviceConfig providerConfig,
DriverData driverData) {
// Obtain device and port description and advertise device to core.
DeviceDescription description = null;
final List<PortDescription> ports;
final DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(
driver, DeviceDescriptionDiscovery.class, driverData);
if (deviceDiscovery != null) {
description = deviceDiscovery.discoverDeviceDetails();
ports = deviceDiscovery.discoverPortDetails();
} else {
log.warn("Missing DeviceDescriptionDiscovery behavior for {}, " +
"no update for description or ports.", deviceId);
ports = new ArrayList<>();
}
if (description == null) {
// Generate one here.
// FIXME: a behavior impl should not return a null description
// (e.g. as GnmiDeviceDescriptionDiscovery). This case should apply
// only if a the behavior is not available.
description = new DefaultDeviceDescription(
deviceId.uri(), Device.Type.SWITCH,
driver.manufacturer(), driver.hwVersion(),
driver.swVersion(), UNKNOWN,
new ChassisId(), true,
DefaultAnnotations.builder()
.set(AnnotationKeys.PROTOCOL,
providerConfig.protocolsInfo().keySet().toString())
.build());
}
providerService.deviceConnected(deviceId, description);
providerService.updatePorts(deviceId, ports);
}
/**
* Handles the case of a device that is pipeline programmable. Returns true
* if the operation wa successful and the device can be registered to the
* core, false otherwise.
*/
private boolean handlePipeconf(DeviceId deviceId, Driver driver,
DriverData driverData, boolean asMaster) {
final PiPipelineProgrammable pipelineProg = getBehaviour(
driver, PiPipelineProgrammable.class, driverData);
if (pipelineProg == null) {
// Device is not pipeline programmable.
return true;
}
final PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
if (pipeconf == null) {
return false;
}
final PiPipeconfId pipeconfId = pipeconf.id();
// To be removed in change #19606
// final String mergedDriverName = piPipeconfService.mergeDriver(
// deviceId, pipeconfId);
// if (mergedDriverName == null) {
// log.error("Unable to get merged driver for {} and {}, aborting device discovery",
// deviceId, pipeconfId);
// return false;
// }
if (!asMaster) {
// From now one only the master.
return true;
}
// if (!setDriverViaCfg(deviceId, mergedDriverName)) {
// return false;
// }
// FIXME: we just introduced a race condition as it might happen that a
// node does not receive the new cfg (with the merged driver) before the
// device is advertised to the core. Perhaps we should be waiting for a
// NetworkConfig event signaling that the driver has been updated on all
// nodes? The effect is mitigated by deploying the pipeconf (slow
// operation), after calling setDriverViaCfg().
piPipeconfService.bindToDevice(pipeconfId, deviceId);
final Boolean deploySuccess = getFutureWithDeadline(
pipelineProg.deployPipeconf(pipeconf),
"deploying pipeconf", deviceId, null,
opTimeoutLong);
if (deploySuccess == null) {
// Error logged by getFutureWithDeadline().
return false;
} else if (!deploySuccess) {
log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
pipeconfId, deviceId);
return false;
}
return true;
}
private boolean setDriverViaCfg(DeviceId deviceId, String driverName) {
BasicDeviceConfig cfg = cfgService.getConfig(deviceId, BasicDeviceConfig.class);
if (cfg == null) {
log.error("Unable to get basic device config for {}, aborting device discovery",
deviceId);
return false;
}
ObjectNode newCfg = (ObjectNode) cfg.node();
newCfg = newCfg.put(DRIVER, driverName);
ObjectMapper mapper = new ObjectMapper();
JsonNode newCfgNode = mapper.convertValue(newCfg, JsonNode.class);
cfgService.applyConfig(deviceId, BasicDeviceConfig.class, newCfgNode);
return true;
}
private PiPipeconf getPipeconf(DeviceId deviceId, PiPipelineProgrammable pipelineProg) {
PiPipeconfId pipeconfId = getPipeconfFromCfg(deviceId);
if (pipeconfId == null || pipeconfId.id().isEmpty()) {
// No pipeconf has been provided in the cfg.
// Check if device driver provides a default one.
if (pipelineProg.getDefaultPipeconf().isPresent()) {
final PiPipeconf defaultPipeconf = pipelineProg.getDefaultPipeconf().get();
log.info("Using default pipeconf {} for {}", defaultPipeconf.id(), deviceId);
pipeconfId = defaultPipeconf.id();
} else {
log.warn("Device {} is pipeline programmable but no pipeconf can be associated to it", deviceId);
return null;
}
}
// Check if registered
if (!piPipeconfService.getPipeconf(pipeconfId).isPresent()) {
log.warn("Pipeconf {} is not registered", pipeconfId);
return null;
}
return piPipeconfService.getPipeconf(pipeconfId).get();
}
private void doDisconnectDevice(DeviceId deviceId) {
log.debug("Initiating disconnection from {}...", deviceId);
final DeviceHandshaker handshaker = handshakers.remove(deviceId);
final boolean isAvailable = deviceService.isAvailable(deviceId);
// Signal disconnection to core (if master).
if (isAvailable && mastershipService.isLocalMaster(deviceId)) {
providerService.deviceDisconnected(deviceId);
}
// Cancel tasks.
cancelStatsPolling(deviceId);
// Disconnect device.
if (handshaker == null) {
if (isAvailable) {
// If not available don't bother logging. We are probably
// invoking this method multiple times for the same device.
log.warn("Missing DeviceHandshaker behavior for {}, " +
"no guarantees of complete disconnection",
deviceId);
}
return;
}
handshaker.removeDeviceAgentListener(deviceAgentListener);
final boolean disconnectSuccess = getFutureWithDeadline(
handshaker.disconnect(), "performing disconnection",
deviceId, false, opTimeoutShort);
if (!disconnectSuccess) {
log.warn("Unable to disconnect from {}", deviceId);
}
}
// Needed to catch the exception in the executors since are not rethrown otherwise.
private Runnable exceptionSafe(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Exception e) {
log.error("Unhandled Exception", e);
}
};
}
private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
final Lock lock = deviceLocks.get(deviceId);
lock.lock();
try {
return task.get();
} finally {
lock.unlock();
}
}
private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
// Wrapper of withDeviceLock(Supplier, ...) for void tasks.
return () -> withDeviceLock(() -> {
task.run();
return null;
}, deviceId);
}
private void updatePortStatistics(DeviceId deviceId) {
Device device = deviceService.getDevice(deviceId);
if (device != null && deviceService.isAvailable(deviceId) &&
device.is(PortStatisticsDiscovery.class)) {
Collection<PortStatistics> statistics = device.as(PortStatisticsDiscovery.class)
.discoverPortStatistics();
//updating statistcs only if not empty
if (!statistics.isEmpty()) {
providerService.updatePortStatistics(deviceId, statistics);
}
} else {
log.debug("Can't update port statistics for device {}", deviceId);
}
}
private boolean notMyScheme(DeviceId deviceId) {
return !deviceId.uri().getScheme().equals(URI_SCHEME);
}
private void triggerConnect(DeviceId deviceId) {
connectionExecutor.execute(withDeviceLock(
() -> doConnectDevice(deviceId), deviceId));
}
/**
* Listener for configuration events.
*/
private class InternalNetworkConfigListener implements NetworkConfigListener {
@Override
public void event(NetworkConfigEvent event) {
connectionExecutor.execute(() -> consumeConfigEvent(event));
}
@Override
public boolean isRelevant(NetworkConfigEvent event) {
return (event.configClass().equals(GeneralProviderDeviceConfig.class) ||
event.configClass().equals(BasicDeviceConfig.class) ||
event.configClass().equals(PiPipeconfConfig.class)) &&
(event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
}
private void consumeConfigEvent(NetworkConfigEvent event) {
DeviceId deviceId = (DeviceId) event.subject();
//Assuming that the deviceId comes with uri 'device:'
if (notMyScheme(deviceId)) {
// not under my scheme, skipping
log.debug("{} is not my scheme, skipping", deviceId);
return;
}
final boolean configComplete = withDeviceLock(
() -> isDeviceConfigComplete(event, deviceId), deviceId);
if (!configComplete) {
// Still waiting for some configuration.
return;
}
// Good to go.
triggerConnect(deviceId);
cleanUpConfigInfo(deviceId);
}
private boolean isDeviceConfigComplete(NetworkConfigEvent event, DeviceId deviceId) {
// FIXME to be removed when netcfg will issue device events in a bundle or
// ensure all configuration needed is present
if (event.configClass().equals(GeneralProviderDeviceConfig.class)) {
//FIXME we currently assume that p4runtime devices are pipeline configurable.
//If we want to connect a p4runtime device with no pipeline
if (event.config().isPresent()) {
deviceConfigured.add(deviceId);
final boolean isNotPipelineConfigurable = Collections.disjoint(
ImmutableSet.copyOf(event.config().get().node().fieldNames()),
PIPELINE_CONFIGURABLE_PROTOCOLS);
if (isNotPipelineConfigurable) {
// Skip waiting for a pipeline if we can't support it.
pipelineConfigured.add(deviceId);
}
}
} else if (event.configClass().equals(BasicDeviceConfig.class)) {
if (event.config().isPresent() && event.config().get().node().has(DRIVER)) {
driverConfigured.add(deviceId);
}
} else if (event.configClass().equals(PiPipeconfConfig.class)) {
if (event.config().isPresent()
&& event.config().get().node().has(PiPipeconfConfig.PIPIPECONFID)) {
pipelineConfigured.add(deviceId);
}
}
if (deviceConfigured.contains(deviceId)
&& driverConfigured.contains(deviceId)
&& pipelineConfigured.contains(deviceId)) {
return true;
} else {
if (deviceConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
log.debug("Waiting for pipeline configuration for device {}", deviceId);
} else if (pipelineConfigured.contains(deviceId) && driverConfigured.contains(deviceId)) {
log.debug("Waiting for device configuration for device {}", deviceId);
} else if (pipelineConfigured.contains(deviceId) && deviceConfigured.contains(deviceId)) {
log.debug("Waiting for driver configuration for device {}", deviceId);
} else if (driverConfigured.contains(deviceId)) {
log.debug("Only driver configuration for device {}", deviceId);
} else if (deviceConfigured.contains(deviceId)) {
log.debug("Only device configuration for device {}", deviceId);
}
}
return false;
}
}
private void augmentConfigData(GeneralProviderDeviceConfig providerConfig, DriverData driverData) {
//Storing deviceKeyId and all other config values
// as data in the driver with protocol_<info>
// name as the key. e.g protocol_ip
providerConfig.protocolsInfo()
.forEach((protocol, deviceInfoConfig) -> {
deviceInfoConfig.configValues()
.forEach((k, v) -> driverData.set(protocol + "_" + k, v));
driverData.set(protocol + "_key", deviceInfoConfig.deviceKeyId());
});
}
private void cleanUpConfigInfo(DeviceId deviceId) {
deviceConfigured.remove(deviceId);
driverConfigured.remove(deviceId);
pipelineConfigured.remove(deviceId);
}
private void startStatsPolling(DeviceId deviceId, boolean withRandomDelay) {
statsPollingTasks.compute(deviceId, (did, oldTask) -> {
if (oldTask != null) {
oldTask.cancel(false);
}
final int delay = withRandomDelay
? new SecureRandom().nextInt(10) : 0;
return statsExecutor.scheduleAtFixedRate(
exceptionSafe(() -> updatePortStatistics(deviceId)),
delay, statsPollFrequency, TimeUnit.SECONDS);
});
}
private void cancelStatsPolling(DeviceId deviceId) {
statsPollingTasks.computeIfPresent(deviceId, (did, task) -> {
task.cancel(false);
return null;
});
}
private void rescheduleStatsPollingTasks() {
statsPollingTasks.keySet().forEach(deviceId -> {
// startStatsPolling cancels old one if present.
startStatsPolling(deviceId, true);
});
}
private void triggerProbeAllDevices() {
// Async trigger a task for all devices in the cfg.
cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
.forEach(deviceId -> connectionExecutor.execute(withDeviceLock(
() -> doDeviceProbe(deviceId), deviceId)));
}
private PiPipeconfId getPipeconfFromCfg(DeviceId deviceId) {
PiPipeconfConfig config = cfgService.getConfig(
deviceId, PiPipeconfConfig.class);
if (config == null) {
return null;
}
return config.piPipeconfId();
}
private void doDeviceProbe(DeviceId deviceId) {
if (!configIsPresent(deviceId)) {
return;
}
final boolean isAvailable = deviceService.getDevice(deviceId) != null
&& deviceService.isAvailable(deviceId);
final boolean isLocalMaster = mastershipService.isLocalMaster(deviceId);
if (isAvailable) {
if (!isLocalMaster) {
return;
}
if (!isReachable(deviceId)) {
log.info("Disconnecting available but unreachable device {}...",
deviceId);
triggerDisconnect(deviceId);
}
} else {
// We do not check for reachability using isReachable()
// since the behaviour of this method can vary depending on protocol
// nuances. We leave this check to the device handshaker at later
// stages of the connection process.
triggerConnect(deviceId);
}
}
private boolean configIsPresent(DeviceId deviceId) {
final boolean present =
cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class) != null
&& cfgService.getConfig(deviceId, BasicDeviceConfig.class) != null;
if (!present) {
log.warn("Configuration for device {} is not complete", deviceId);
}
return present;
}
private void handleChannelClosed(DeviceId deviceId) {
log.info("Disconnecting device {}, due to channel closed event",
deviceId);
triggerDisconnect(deviceId);
}
private void handleMastershipResponse(DeviceId deviceId, MastershipRole response) {
//Notify core about response.
if (!requestedRoles.containsKey(deviceId)) {
return;
}
providerService.receivedRoleReply(deviceId, requestedRoles.get(deviceId), response);
if (response.equals(MastershipRole.MASTER)) {
startStatsPolling(deviceId, false);
} else {
cancelStatsPolling(deviceId);
}
}
private <U> U getFutureWithDeadline(CompletableFuture<U> future, String opDescription,
DeviceId deviceId, U defaultValue, int timeout) {
try {
return future.get(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Thread interrupted while {} on {}", opDescription, deviceId);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Exception while {} on {}", opDescription, deviceId, e.getCause());
} catch (TimeoutException e) {
log.error("Operation TIMEOUT while {} on {}", opDescription, deviceId);
}
return defaultValue;
}
/**
* Listener for core device events.
*/
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceId deviceId = event.subject().id();
// For now this is scheduled periodically, when streaming API will
// be available we check and base it on the streaming API (e.g. gNMI)
if (mastershipService.isLocalMaster(deviceId)) {
startStatsPolling(deviceId, true);
}
}
@Override
public boolean isRelevant(DeviceEvent event) {
return event.type() == Type.DEVICE_ADDED &&
event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
}
}
/**
* Listener for device agent events.
*/
private class InternalDeviceAgentListener implements DeviceAgentListener {
@Override
public void event(DeviceAgentEvent event) {
DeviceId deviceId = event.subject();
switch (event.type()) {
case CHANNEL_OPEN:
// Ignore.
break;
case CHANNEL_CLOSED:
handleChannelClosed(deviceId);
break;
case CHANNEL_ERROR:
// TODO evaluate other reaction to channel error.
log.warn("Received CHANNEL_ERROR from {}. Is the channel still open?",
deviceId);
handleChannelClosed(deviceId);
break;
case ROLE_MASTER:
handleMastershipResponse(deviceId, MastershipRole.MASTER);
break;
case ROLE_STANDBY:
handleMastershipResponse(deviceId, MastershipRole.STANDBY);
break;
case ROLE_NONE:
handleMastershipResponse(deviceId, MastershipRole.NONE);
break;
default:
log.warn("Unrecognized device agent event {}", event.type());
}
}
}
}