blob: e63e6e07b5b88e9ceb991e744f23dc2060749ecd [file] [log] [blame]
/*
* Copyright 2016-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.provider.rest.device.impl;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import org.onlab.packet.ChassisId;
import org.onlab.util.SharedExecutors;
import org.onlab.util.SharedScheduledExecutorService;
import org.onlab.util.SharedScheduledExecutors;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.behaviour.DevicesDiscovery;
import org.onosproject.net.behaviour.PortAdmin;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceDescriptionDiscovery;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.driver.DefaultDriverData;
import org.onosproject.net.driver.DefaultDriverHandler;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverHandler;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.protocol.rest.DefaultRestSBDevice;
import org.onosproject.protocol.rest.RestSBController;
import org.onosproject.protocol.rest.RestSBDevice;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.MediaType;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.config.NetworkConfigEvent.Type.CONFIG_ADDED;
import static org.onosproject.net.config.NetworkConfigEvent.Type.CONFIG_REMOVED;
import static org.onosproject.net.config.NetworkConfigEvent.Type.CONFIG_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provider for devices that use REST as means of configuration communication.
*/
@Component(immediate = true, service = DeviceProvider.class)
public class RestDeviceProvider extends AbstractProvider
implements DeviceProvider {
private static final String APP_NAME = "org.onosproject.restsb";
protected static final String REST = "rest";
private static final String PROVIDER = "org.onosproject.provider.rest.device";
private static final String IPADDRESS = "ipaddress";
private static final String ISNOTNULL = "Rest device is not null";
private static final String UNKNOWN = "unknown";
private static final String POLL_FREQUENCY = "pollFrequency";
private static final int REST_TIMEOUT_SEC = 5;
private static final int EXECUTOR_THREAD_POOL_SIZE = 8;
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected RestSBController controller;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected NetworkConfigRegistry netCfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService compCfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 30;
//@Property(name = POLL_FREQUENCY, intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
// label = "Configure poll frequency for port status and statistics; " +
// "default is 30 seconds")
private int pollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
private DeviceProviderService providerService;
private ApplicationId appId;
private ExecutorService executor;
private final SharedScheduledExecutorService portStatisticsExecutor =
SharedScheduledExecutors.getPoolThreadExecutor();
private final List<ConfigFactory> factories = ImmutableList.of(
new ConfigFactory<DeviceId, RestDeviceConfig>(SubjectFactories.DEVICE_SUBJECT_FACTORY,
RestDeviceConfig.class,
REST) {
@Override
public RestDeviceConfig createConfig() {
return new RestDeviceConfig();
}
});
private final NetworkConfigListener configListener = new InternalNetworkConfigListener();
private ScheduledFuture<?> scheduledTask;
@Activate
public void activate() {
appId = coreService.registerApplication(APP_NAME);
compCfgService.registerProperties(getClass());
providerService = providerRegistry.register(this);
factories.forEach(netCfgService::registerConfigFactory);
executor = Executors.newFixedThreadPool(
EXECUTOR_THREAD_POOL_SIZE, groupedThreads("onos/restsbprovider", "device-installer-%d", log)
);
netCfgService.addListener(configListener);
executor.execute(RestDeviceProvider.this::createAndConnectDevices);
scheduledTask = schedulePolling();
log.info("Started");
}
@Modified
public void modified(ComponentContext context) {
int previousPollFrequency = pollFrequency;
if (context != null) {
Dictionary<?, ?> properties = context.getProperties();
pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY,
DEFAULT_POLL_FREQUENCY_SECONDS);
log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
}
// Re-schedule only if frequency has changed
if (!scheduledTask.isCancelled() && (previousPollFrequency != pollFrequency)) {
log.info("Re-scheduling port statistics task with frequency {} seconds", pollFrequency);
scheduledTask.cancel(true);
scheduledTask = schedulePolling();
}
}
@Deactivate
public void deactivate() {
compCfgService.unregisterProperties(getClass(), false);
netCfgService.removeListener(configListener);
providerRegistry.unregister(this);
providerService = null;
factories.forEach(netCfgService::unregisterConfigFactory);
scheduledTask.cancel(true);
executor.shutdown();
log.info("Stopped");
}
public RestDeviceProvider() {
super(new ProviderId(REST, PROVIDER));
}
@Override
public void triggerProbe(DeviceId deviceId) {
// TODO: This will be implemented later.
log.info("Triggering probe on device {}", deviceId);
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
// TODO: This will be implemented later.
}
@Override
public boolean isReachable(DeviceId deviceId) {
RestSBDevice restDevice = controller.getDevice(deviceId);
if (restDevice == null) {
restDevice = controller.getProxySBDevice(deviceId);
if (restDevice == null) {
log.debug("the requested device id: " +
deviceId.toString() +
" is not associated to any REST or REST " +
"proxy Device");
return false;
}
}
return restDevice.isActive();
}
private void deviceAdded(RestSBDevice restSBDev) {
checkNotNull(restSBDev, ISNOTNULL);
Driver driver = driverService.getDriver(restSBDev.manufacturer().get(),
restSBDev.hwVersion().get(),
restSBDev.swVersion().get());
// Check if the server is controlling a single or multiple devices
if (restSBDev.isProxy()) {
if (driver != null && driver.hasBehaviour(DevicesDiscovery.class)) {
DevicesDiscovery devicesDiscovery = devicesDiscovery(restSBDev, driver);
Set<DeviceId> deviceIds = devicesDiscovery.deviceIds();
restSBDev.setActive(true);
deviceIds.forEach(deviceId -> {
controller.addProxiedDevice(deviceId, restSBDev);
DeviceDescription devDesc = devicesDiscovery.deviceDetails(deviceId);
checkNotNull(devDesc, "DeviceDescription cannot be null");
providerService.deviceConnected(deviceId, mergeAnn(restSBDev.deviceId(), devDesc));
if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
DriverHandler h = driverService.createHandler(deviceId);
DeviceDescriptionDiscovery devDisc = h.behaviour(DeviceDescriptionDiscovery.class);
providerService.updatePorts(deviceId, devDisc.discoverPortDetails());
}
checkAndUpdateDevice(deviceId);
});
} else {
log.warn("Driver not found for {}", restSBDev);
}
} else {
DeviceId deviceId = restSBDev.deviceId();
if (driver != null && driver.hasBehaviour(DevicesDiscovery.class)) {
restSBDev.setActive(true);
DevicesDiscovery devicesDiscovery = devicesDiscovery(restSBDev, driver);
DeviceDescription deviceDescription = devicesDiscovery.deviceDetails(deviceId);
checkNotNull(deviceDescription, "DeviceDescription cannot be null");
providerService.deviceConnected(deviceId, deviceDescription);
if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
DriverHandler h = driverService.createHandler(deviceId);
DeviceDescriptionDiscovery deviceDiscovery = h.behaviour(DeviceDescriptionDiscovery.class);
providerService.updatePorts(deviceId, deviceDiscovery.discoverPortDetails());
}
} else {
ChassisId cid = new ChassisId();
String ipAddress = restSBDev.ip().toString();
SparseAnnotations annotations = DefaultAnnotations.builder()
.set(IPADDRESS, ipAddress)
.set(AnnotationKeys.PROTOCOL, REST.toUpperCase())
.build();
DeviceDescription deviceDescription = new DefaultDeviceDescription(
deviceId.uri(),
Device.Type.SWITCH,
UNKNOWN, UNKNOWN,
UNKNOWN, UNKNOWN,
cid,
annotations);
restSBDev.setActive(true);
providerService.deviceConnected(deviceId, deviceDescription);
}
checkAndUpdateDevice(deviceId);
}
}
private DefaultDeviceDescription mergeAnn(DeviceId devId, DeviceDescription desc) {
return new DefaultDeviceDescription(
desc,
DefaultAnnotations.merge(
DefaultAnnotations.builder()
.set(AnnotationKeys.PROTOCOL, REST.toUpperCase())
// The rest server added as annotation to the device
.set(AnnotationKeys.REST_SERVER, devId.toString())
.build(),
desc.annotations()));
}
private DevicesDiscovery devicesDiscovery(RestSBDevice restSBDevice, Driver driver) {
DriverData driverData = new DefaultDriverData(driver, restSBDevice.deviceId());
DevicesDiscovery devicesDiscovery = driver.createBehaviour(driverData, DevicesDiscovery.class);
devicesDiscovery.setHandler(new DefaultDriverHandler(driverData));
return devicesDiscovery;
}
private void checkAndUpdateDevice(DeviceId deviceId) {
if (deviceService.getDevice(deviceId) == null) {
log.warn("Device {} has not been added to store, maybe due to a problem in connectivity", deviceId);
} else {
boolean isReachable = isReachable(deviceId);
if (isReachable && deviceService.isAvailable(deviceId)) {
Device device = deviceService.getDevice(deviceId);
if (device.is(DeviceDescriptionDiscovery.class)) {
DeviceDescriptionDiscovery deviceDescriptionDiscovery =
device.as(DeviceDescriptionDiscovery.class);
DeviceDescription updatedDeviceDescription =
deviceDescriptionDiscovery.discoverDeviceDetails();
if (updatedDeviceDescription != null &&
!descriptionEquals(device, updatedDeviceDescription)) {
providerService.deviceConnected(
deviceId,
new DefaultDeviceDescription(
updatedDeviceDescription, true,
updatedDeviceDescription.annotations()));
//if ports are not discovered, retry the discovery
if (deviceService.getPorts(deviceId).isEmpty()) {
discoverPorts(deviceId);
}
}
} else {
log.warn("No DeviceDescriptionDiscovery behaviour for device {}", deviceId);
}
} else if (!isReachable && deviceService.isAvailable(deviceId)) {
providerService.deviceDisconnected(deviceId);
}
}
}
private boolean descriptionEquals(Device device, DeviceDescription updatedDeviceDescription) {
return Objects.equal(device.id().uri(), updatedDeviceDescription.deviceUri())
&& Objects.equal(device.type(), updatedDeviceDescription.type())
&& Objects.equal(device.manufacturer(), updatedDeviceDescription.manufacturer())
&& Objects.equal(device.hwVersion(), updatedDeviceDescription.hwVersion())
&& Objects.equal(device.swVersion(), updatedDeviceDescription.swVersion())
&& Objects.equal(device.serialNumber(), updatedDeviceDescription.serialNumber())
&& Objects.equal(device.chassisId(), updatedDeviceDescription.chassisId())
&& Objects.equal(device.annotations(), updatedDeviceDescription.annotations());
}
private void deviceRemoved(DeviceId deviceId) {
checkNotNull(deviceId, ISNOTNULL);
providerService.deviceDisconnected(deviceId);
controller.getProxiedDevices(deviceId).forEach(device -> {
controller.removeProxiedDevice(device);
providerService.deviceDisconnected(device);
});
controller.removeDevice(deviceId);
}
//Method to connect devices provided via net-cfg under devices/ tree
private void createAndConnectDevices() {
Set<DeviceId> deviceSubjects =
netCfgService.getSubjects(DeviceId.class, RestDeviceConfig.class);
connectDevices(deviceSubjects.stream()
.filter(deviceId -> deviceService.getDevice(deviceId) == null)
.map(deviceId -> {
RestDeviceConfig config =
netCfgService.getConfig(deviceId, RestDeviceConfig.class);
return toInactiveRestSBDevice(config);
}).collect(Collectors.toSet()));
}
private RestSBDevice toInactiveRestSBDevice(RestDeviceConfig config) {
return new DefaultRestSBDevice(config.ip(),
config.port(),
config.username(),
config.password(),
config.protocol(),
config.url(),
false,
config.testUrl(),
config.manufacturer(),
config.hwVersion(),
config.swVersion(),
config.isProxy(),
config.authenticationScheme(),
config.token()
);
}
private void connectDevices(Set<RestSBDevice> devices) {
//Precomputing the devices to be removed
Set<RestSBDevice> toBeRemoved = new HashSet<>(controller.getDevices().values());
toBeRemoved.removeAll(devices);
//Adding new devices
devices.stream()
.filter(device -> {
device.setActive(false);
controller.addDevice(device);
return testDeviceConnection(device);
})
.forEach(this::deviceAdded);
//Removing devices not wanted anymore
toBeRemoved.forEach(device -> deviceRemoved(device.deviceId()));
}
private void connectDevice(RestSBDevice device) {
// TODO borrowed from above,
// not sure why setting it to inactive
device.setActive(false);
controller.addDevice(device);
if (testDeviceConnection(device)) {
deviceAdded(device);
}
}
private ScheduledFuture schedulePolling() {
return portStatisticsExecutor.scheduleAtFixedRate(this::executePortStatisticsUpdate,
pollFrequency / 2, pollFrequency,
TimeUnit.SECONDS);
}
private void executePortStatisticsUpdate() {
controller.getDevices().keySet().forEach(this::updatePortStatistics);
}
private void updatePortStatistics(DeviceId deviceId) {
Device device = deviceService.getDevice(deviceId);
checkNotNull(device, "device cannot be null");
if (device.is(PortStatisticsDiscovery.class)) {
PortStatisticsDiscovery portStatisticsDiscovery = device.as(PortStatisticsDiscovery.class);
Collection<PortStatistics> portStatistics = portStatisticsDiscovery.discoverPortStatistics();
if (portStatistics != null && !portStatistics.isEmpty()) {
providerService.updatePortStatistics(deviceId, portStatistics);
}
} else {
log.debug("No port statistics getter behaviour for device {}", deviceId);
}
}
private void discoverPorts(DeviceId deviceId) {
Device device = deviceService.getDevice(deviceId);
DeviceDescriptionDiscovery deviceDescriptionDiscovery = device.as(DeviceDescriptionDiscovery.class);
providerService.updatePorts(deviceId, deviceDescriptionDiscovery.discoverPortDetails());
}
private boolean testDeviceConnection(RestSBDevice dev) {
try {
Callable<Boolean> connectionSuccess;
if (dev.testUrl().isPresent()) {
connectionSuccess = () ->
controller.get(dev.deviceId(), dev.testUrl().get(), MediaType.APPLICATION_JSON_TYPE) != null;
} else {
connectionSuccess = () ->
controller.get(dev.deviceId(), "", MediaType.APPLICATION_JSON_TYPE) != null;
}
Future<Boolean> future = executor.submit(connectionSuccess);
try {
return future.get(REST_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (TimeoutException ex) {
log.warn("Connection to device {} timed out: {}", dev.deviceId(), ex.getMessage());
return false;
} catch (InterruptedException ex) {
log.warn("Connection to device {} interrupted: {}", dev.deviceId(), ex.getMessage());
Thread.currentThread().interrupt();
return false;
} catch (ExecutionException ex) {
log.warn("Connection to device {} had an execution exception.", dev.deviceId(), ex);
return false;
}
} catch (ProcessingException e) {
log.warn("Cannot connect to device {}", dev, e);
}
return false;
}
private class InternalNetworkConfigListener implements NetworkConfigListener {
@Override
public void event(NetworkConfigEvent event) {
if (!isRelevant(event)) {
log.warn("Irrelevant network configuration event: {}", event);
return;
}
ExecutorService bg = SharedExecutors.getSingleThreadExecutor();
if (event.type() == CONFIG_REMOVED) {
DeviceId did = (DeviceId) event.subject();
bg.execute(() -> deviceRemoved(did));
} else {
// CONFIG_ADDED or CONFIG_UPDATED
RestDeviceConfig cfg = (RestDeviceConfig) event.config().get();
RestSBDevice restSBDevice = toInactiveRestSBDevice(cfg);
bg.execute(() -> connectDevice(restSBDevice));
}
}
@Override
public boolean isRelevant(NetworkConfigEvent event) {
return event.configClass().equals(RestDeviceConfig.class) &&
(event.type() == CONFIG_ADDED ||
event.type() == CONFIG_UPDATED ||
event.type() == CONFIG_REMOVED);
}
}
@Override
public void changePortState(DeviceId deviceId, PortNumber portNumber,
boolean enable) {
Device device = deviceService.getDevice(deviceId);
if (device != null) {
if (device.is(PortAdmin.class)) {
PortAdmin portAdmin = device.as(PortAdmin.class);
CompletableFuture<Boolean> modified;
if (enable) {
modified = portAdmin.enable(portNumber);
} else {
modified = portAdmin.disable(portNumber);
}
modified.thenAcceptAsync(result -> {
if (!result) {
log.warn("Device {} port {} state can't be changed to {}",
deviceId, portNumber, enable);
}
});
} else {
log.warn("Device {} does not support PortAdmin behavior", deviceId);
}
} else {
log.warn("unable to get the device {}, port {} state can't be changed to {}",
deviceId, portNumber, enable);
}
}
}