Refactoring Netconf Device Provider to simplify device connection and avoid calling behaviours when the session is not yet established
Change-Id: I7d3b1efc6194629ba77918dfd759781f7ccf1b60
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index 19197e7..1ff38db 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -21,6 +21,7 @@
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
@@ -104,6 +105,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected NetworkConfigRegistry netCfgService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
public static final Logger log = LoggerFactory
.getLogger(NetconfControllerImpl.class);
@@ -283,7 +287,7 @@
@Override
public void disconnectDevice(DeviceId deviceId, boolean remove) {
if (!netconfDeviceMap.containsKey(deviceId)) {
- log.warn("Device {} is not present", deviceId);
+ log.debug("Device {} is not present", deviceId);
} else {
stopDevice(deviceId, remove);
}
@@ -356,7 +360,8 @@
@Override
public void event(NetconfDeviceOutputEvent event) {
DeviceId did = event.getDeviceInfo().getDeviceId();
- if (event.type().equals(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED)) {
+ if (event.type().equals(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED) ||
+ !mastershipService.isLocalMaster(did)) {
removeDevice(did);
} else if (event.type().equals(NetconfDeviceOutputEvent.Type.SESSION_CLOSED)) {
log.info("Trying to reestablish connection with device {}", did);
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 9a70597..e73a4c6 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -35,6 +35,7 @@
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.osgi.ServiceDirectory;
+import org.onlab.util.ItemNotFoundException;
import org.onlab.util.SharedExecutors;
import org.onosproject.net.DeviceId;
import org.onosproject.net.driver.Driver;
@@ -188,17 +189,22 @@
public Set<String> getClientCapabilites(DeviceId deviceId) {
Set<String> capabilities = new LinkedHashSet<>();
DriverService driverService = directory.get(DriverService.class);
- Driver driver = driverService.getDriver(deviceId);
- if (driver == null) {
+ try {
+ Driver driver = driverService.getDriver(deviceId);
+ if (driver == null) {
+ return capabilities;
+ }
+ String clientCapabilities = driver.getProperty(NETCONF_CLIENT_CAPABILITY);
+ if (clientCapabilities == null) {
+ return capabilities;
+ }
+ String[] textStr = clientCapabilities.split("\\|");
+ capabilities.addAll(Arrays.asList(textStr));
+ return capabilities;
+ } catch (ItemNotFoundException e) {
+ log.warn("Driver for device {} currently not available", deviceId);
return capabilities;
}
- String clientCapabilities = driver.getProperty(NETCONF_CLIENT_CAPABILITY);
- if (clientCapabilities == null) {
- return capabilities;
- }
- String[] textStr = clientCapabilities.split("\\|");
- capabilities.addAll(Arrays.asList(textStr));
- return capabilities;
}
private void startConnection() throws NetconfException {
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
index e503584..bbac123 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
@@ -26,6 +26,8 @@
import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.Config;
import org.onosproject.net.config.ConfigApplyDelegate;
@@ -117,6 +119,7 @@
private static DeviceService deviceService = new NetconfDeviceServiceMock();
private static DeviceKeyService deviceKeyService = new NetconfDeviceKeyServiceMock();
private final NetworkConfigRegistry netCfgService = new MockNetworkConfigRegistry();
+ private final MastershipService mastershipService = new MockmastershipService();
private final ComponentContext context = new MockComponentContext();
@@ -128,6 +131,7 @@
ctrl.deviceService = deviceService;
ctrl.deviceKeyService = deviceKeyService;
ctrl.netCfgService = netCfgService;
+ ctrl.mastershipService = mastershipService;
NetconfControllerImpl.netconfConnectTimeout = NETCONF_CONNECT_TIMEOUT_DEFAULT;
NetconfControllerImpl.netconfIdleTimeout = NETCONF_IDLE_TIMEOUT_DEFAULT;
NetconfControllerImpl.netconfReplyTimeout = NETCONF_REPLY_TIMEOUT_DEFAULT;
@@ -536,4 +540,11 @@
public void onApply(Config configFile) {
}
}
+
+ private class MockmastershipService extends MastershipServiceAdapter {
+ @Override
+ public boolean isLocalMaster(DeviceId deviceId) {
+ return true;
+ }
+ }
}
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 12325c3..cb61636 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -16,14 +16,12 @@
package org.onosproject.provider.netconf.device.impl;
-import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Striped;
import org.apache.commons.lang3.tuple.Triple;
+import com.google.common.util.concurrent.Striped;
import org.onlab.packet.ChassisId;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
@@ -52,6 +50,12 @@
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.device.PortStatisticsDiscovery;
+import org.onosproject.net.driver.DefaultDriverData;
+import org.onosproject.net.driver.DefaultDriverHandler;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.driver.DriverData;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
import org.onosproject.net.key.DeviceKey;
import org.onosproject.net.key.DeviceKeyAdminService;
import org.onosproject.net.key.DeviceKeyId;
@@ -74,8 +78,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Dictionary;
import java.util.Map;
@@ -83,6 +85,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -94,8 +97,11 @@
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.provider.netconf.device.impl.OsgiPropertyConstants.*;
import static org.onosproject.netconf.NetconfDeviceInfo.extractIpPortPath;
+import static org.onosproject.provider.netconf.device.impl.OsgiPropertyConstants.MAX_RETRIES;
+import static org.onosproject.provider.netconf.device.impl.OsgiPropertyConstants.MAX_RETRIES_DEFAULT;
+import static org.onosproject.provider.netconf.device.impl.OsgiPropertyConstants.POLL_FREQUENCY_SECONDS;
+import static org.onosproject.provider.netconf.device.impl.OsgiPropertyConstants.POLL_FREQUENCY_SECONDS_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -127,6 +133,9 @@
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DriverService driverService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceKeyAdminService deviceKeyAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -142,30 +151,31 @@
private static final String UNKNOWN = "unknown";
protected static final String ISNULL = "NetconfDeviceInfo is null";
private static final String IPADDRESS = "ipaddress";
- private static final String NETCONF = "netconf";
private static final String PORT = "port";
private static final String PATH = "path";
private static final int CORE_POOL_SIZE = 10;
- /** Configure poll frequency for port status and statistics; default is 30 sec. */
+ /**
+ * Configure poll frequency for port status and statistics; default is 30 sec.
+ */
private int pollFrequency = POLL_FREQUENCY_SECONDS_DEFAULT;
- /** Configure maximum allowed number of retries for obtaining list of ports; default is 5 times. */
+ /**
+ * Configure maximum allowed number of retries for obtaining list of ports; default is 5 times.
+ */
private int maxRetries = MAX_RETRIES_DEFAULT;
- protected ExecutorService executor =
- Executors.newFixedThreadPool(5, groupedThreads("onos/netconfdeviceprovider",
- "device-installer-%d", log));
- protected ScheduledExecutorService connectionExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/netconfdeviceprovider",
- "connection-executor-%d", log));
+ protected ExecutorService connectionExecutor = Executors.newFixedThreadPool(CORE_POOL_SIZE,
+ groupedThreads("onos/netconfDeviceProviderConnection",
+ "connection-executor-%d", log));
+ protected ScheduledExecutorService pollingExecutor = newScheduledThreadPool(CORE_POOL_SIZE,
+ groupedThreads("onos/netconfDeviceProviderPoll",
+ "polling-executor-%d", log));
protected DeviceProviderService providerService;
- private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();
- private InternalDeviceListener deviceListener = new InternalDeviceListener();
private final Map<DeviceId, AtomicInteger> retriedPortDiscoveryMap = new ConcurrentHashMap<>();
protected ScheduledFuture<?> scheduledTask;
+ private final Striped<Lock> deviceLocks = Striped.lock(30);
protected final ConfigFactory factory =
// TODO consider moving Config registration to NETCONF ctl bundle
@@ -179,9 +189,9 @@
};
protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
- private ApplicationId appId;
+ private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();
+ private InternalDeviceListener deviceListener = new InternalDeviceListener();
private boolean active;
- private final Striped<Lock> deviceLocks = Striped.lock(30);
@Activate
@@ -189,12 +199,12 @@
active = true;
componentConfigService.registerProperties(getClass());
providerService = providerRegistry.register(this);
- appId = coreService.registerApplication(APP_NAME);
+ coreService.registerApplication(APP_NAME);
cfgService.registerConfigFactory(factory);
cfgService.addListener(cfgListener);
controller.addDeviceListener(innerNodeListener);
deviceService.addListener(deviceListener);
- executor.execute(NetconfDeviceProvider.this::connectDevices);
+ pollingExecutor.execute(NetconfDeviceProvider.this::connectDevices);
modified(context);
log.info("Started");
}
@@ -202,6 +212,7 @@
@Deactivate
public void deactivate() {
+ cfgService.removeListener(cfgListener);
componentConfigService.unregisterProperties(getClass(), false);
deviceService.removeListener(deviceListener);
active = false;
@@ -210,13 +221,13 @@
controller.disconnectDevice(id, true);
});
controller.removeDeviceListener(innerNodeListener);
- deviceService.removeListener(deviceListener);
providerRegistry.unregister(this);
providerService = null;
retriedPortDiscoveryMap.clear();
cfgService.unregisterConfigFactory(factory);
scheduledTask.cancel(true);
- executor.shutdown();
+ connectionExecutor.shutdown();
+ pollingExecutor.shutdown();
log.info("Stopped");
}
@@ -226,7 +237,7 @@
if (context != null) {
Dictionary<?, ?> properties = context.getProperties();
pollFrequency = Tools.getIntegerProperty(properties, POLL_FREQUENCY_SECONDS,
- POLL_FREQUENCY_SECONDS_DEFAULT);
+ POLL_FREQUENCY_SECONDS_DEFAULT);
log.info("Configured. Poll frequency is configured to {} seconds", pollFrequency);
maxRetries = Tools.getIntegerProperty(properties, MAX_RETRIES, MAX_RETRIES_DEFAULT);
@@ -242,44 +253,26 @@
super(new ProviderId(SCHEME_NAME, DEVICE_PROVIDER_PACKAGE));
}
- // Checks connection to devices in the config file
- // every DEFAULT_POLL_FREQUENCY_SECONDS seconds.
- private ScheduledFuture schedulePolling() {
- return connectionExecutor.scheduleAtFixedRate(exceptionSafe(this::checkAndUpdateDevices),
- pollFrequency / 10,
- pollFrequency, TimeUnit.SECONDS);
- }
-
- private Runnable exceptionSafe(Runnable runnable) {
- return new Runnable() {
-
- @Override
- public void run() {
- try {
- runnable.run();
- } catch (Exception e) {
- log.error("Unhandled Exception", e);
- }
- }
- };
- }
-
@Override
public void triggerProbe(DeviceId deviceId) {
- // TODO: This will be implemented later.
- log.debug("Should be triggering probe on device {}", deviceId);
+ //Not implemented, unused in netconf cases.
+ log.debug("Probing {} not implemented, not useful for NETCONF", deviceId);
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+ log.debug("Request role change {}, {}", deviceId, newRole);
if (active) {
switch (newRole) {
case MASTER:
- withDeviceLock(
- () -> initiateConnection(deviceId, newRole), deviceId).run();
- log.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
+ if (controller.getNetconfDevice(deviceId) == null) {
+ connectionExecutor.execute(exceptionSafe(() -> withDeviceLock(
+ () -> initiateConnection(deviceId), deviceId).run()));
+ log.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
+ }
break;
case STANDBY:
+ //TODO this issue a warning on the first election/connection
controller.disconnectDevice(deviceId, false);
providerService.receivedRoleReply(deviceId, newRole, MastershipRole.STANDBY);
//else no-op
@@ -297,11 +290,10 @@
@Override
public boolean isReachable(DeviceId deviceId) {
-
boolean sessionExists =
Optional.ofNullable(controller.getDevicesMap().get(deviceId))
- .map(NetconfDevice::isActive)
- .orElse(false);
+ .map(NetconfDevice::isActive)
+ .orElse(false);
if (sessionExists) {
return true;
}
@@ -338,43 +330,45 @@
}
@Override
- public void changePortState(DeviceId deviceId, PortNumber portNumber,
- boolean enable) {
+ public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
Device device = deviceService.getDevice(deviceId);
- if (mastershipService.isLocalMaster(deviceId)) {
- if (device.is(PortAdmin.class)) {
- PortAdmin portAdmin =
- device.as(PortAdmin.class);
- CompletableFuture<Boolean> modified;
- if (enable) {
- modified = portAdmin.enable(portNumber);
- } else {
- modified = portAdmin.disable(portNumber);
- }
- modified.thenAccept(result -> {
- if (result) {
- Port port = deviceService.getPort(deviceId, portNumber);
- //rebuilding port description with admin state changed.
- providerService.portStatusChanged(deviceId,
- DefaultPortDescription.builder()
- .withPortNumber(portNumber)
- .isEnabled(enable)
- .isRemoved(false)
- .type(port.type())
- .portSpeed(port.portSpeed())
- .annotations((SparseAnnotations) port.annotations())
- .build());
- } else {
- log.warn("Your device {} port {} status can't be changed to {}",
- deviceId, portNumber, enable);
- }
- });
- } else {
- log.warn("Device {} does not support Port Admin", deviceId);
- }
- } else {
- log.debug("Not master but {}, not changing port state", mastershipService.getLocalRole(deviceId));
+ if (device == null) {
+ log.error("Device {} is not present in the store", deviceId);
+ return;
}
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ log.info("Not master but {}, not changing port state", mastershipService.getLocalRole(deviceId));
+ return;
+ }
+ if (!device.is(PortAdmin.class)) {
+ log.warn("Device {} does not support Port Admin", deviceId);
+ return;
+ }
+ PortAdmin portAdmin = device.as(PortAdmin.class);
+ CompletableFuture<Boolean> modified;
+ if (enable) {
+ modified = portAdmin.enable(portNumber);
+ } else {
+ modified = portAdmin.disable(portNumber);
+ }
+ modified.thenAccept(result -> {
+ if (result) {
+ Port port = deviceService.getPort(deviceId, portNumber);
+ //rebuilding port description with admin state changed.
+ providerService.portStatusChanged(deviceId,
+ DefaultPortDescription.builder()
+ .withPortNumber(portNumber)
+ .isEnabled(enable)
+ .isRemoved(false)
+ .type(port.type())
+ .portSpeed(port.portSpeed())
+ .annotations((SparseAnnotations) port.annotations())
+ .build());
+ } else {
+ log.warn("Your device {} port {} status can't be changed to {}",
+ deviceId, portNumber, enable);
+ }
+ });
}
@Override
@@ -383,119 +377,180 @@
controller.disconnectDevice(deviceId, true);
}
- private class InnerNetconfDeviceListener implements NetconfDeviceListener {
-
-
- @Override
- public void deviceAdded(DeviceId deviceId) {
- //no-op
- log.debug("Netconf device {} added to Netconf subController", deviceId);
- }
-
- @Override
- public void deviceRemoved(DeviceId deviceId) {
- Preconditions.checkNotNull(deviceId, ISNULL);
-
- if (deviceService.getDevice(deviceId) != null) {
- providerService.deviceDisconnected(deviceId);
- retriedPortDiscoveryMap.remove(deviceId);
- log.debug("Netconf device {} removed from Netconf subController", deviceId);
- } else {
- log.warn("Netconf device {} does not exist in the store, " +
- "it may already have been removed", deviceId);
- }
- }
+ private ScheduledFuture schedulePolling() {
+ return pollingExecutor.scheduleAtFixedRate(exceptionSafe(this::checkAndUpdateDevices),
+ pollFrequency / 10,
+ pollFrequency, TimeUnit.SECONDS);
}
+ private Runnable exceptionSafe(Runnable runnable) {
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ log.error("Unhandled Exception", e);
+ }
+ };
+ }
+
+ //Connecting devices with initial config
private void connectDevices() {
- Set<DeviceId> deviceSubjects =
- cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
+ Set<DeviceId> deviceSubjects = cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
deviceSubjects.forEach(deviceId -> {
- connectDevice(cfgService.getConfig(deviceId, NetconfDeviceConfig.class));
+ connectionExecutor.execute(exceptionSafe(() -> runElectionFor(deviceId)));
});
}
+ //updating keys and device info
+ private void checkAndUpdateDevices() {
+ Set<DeviceId> deviceSubjects = cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
+ deviceSubjects.forEach(deviceId -> {
+ log.debug("check and update {}", deviceId);
+ NetconfDeviceConfig config = cfgService.getConfig(deviceId, NetconfDeviceConfig.class);
+ storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
+ discoverOrUpdatePorts(deviceId);
+ });
+ }
- private void connectDevice(NetconfDeviceConfig config) {
- if (config == null) {
- log.warn("connect device invoked with null config");
- return;
+ //Saving device keys in the store
+ private void storeDeviceKey(String sshKey, String username, String password, DeviceId deviceId) {
+ if (sshKey.equals("")) {
+ deviceKeyAdminService.addKey(
+ DeviceKey.createDeviceKeyUsingUsernamePassword(
+ DeviceKeyId.deviceKeyId(deviceId.toString()), null, username, password));
+ } else {
+ deviceKeyAdminService.addKey(
+ DeviceKey.createDeviceKeyUsingSshKey(
+ DeviceKeyId.deviceKeyId(deviceId.toString()), null, username, password, sshKey));
}
- DeviceId deviceId = config.subject();
+ }
+
+ //running an election and applying the role to a given device
+ private void runElectionFor(DeviceId deviceId) {
+ //Triggering an election for the deviceId thus only master will connect
if (!deviceId.uri().getScheme().equals(SCHEME_NAME)) {
// not under my scheme, skipping
log.debug("{} not of schema {}, skipping", deviceId, SCHEME_NAME);
return;
}
+ connectionExecutor.submit(exceptionSafe(() -> {
+ CompletableFuture<MastershipRole> role = mastershipService.requestRoleFor(deviceId);
+ try {
+ roleChanged(deviceId, role.get());
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Can't get role for {} ", deviceId, e);
+ }
+ }));
+ }
+
+ //initiating the SSh connection the a given device.
+ private void initiateConnection(DeviceId deviceId) {
+
if (!isReachable(deviceId)) {
log.warn("Can't connect to device {}", deviceId);
return;
}
- DeviceDescription deviceDescription = createDeviceRepresentation(deviceId, config);
- log.debug("Connecting NETCONF device {}, on {}:{}{} with username {}",
- deviceId, config.ip(), config.port(),
- (config.path().isPresent() ? "/" + config.path().get() : ""),
- config.username());
- storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
- retriedPortDiscoveryMap.put(deviceId, new AtomicInteger(0));
- if (deviceService.getDevice(deviceId) == null) {
- log.debug("device connected {}", deviceId);
- providerService.deviceConnected(deviceId, deviceDescription);
+
+ try {
+ NetconfDevice deviceNetconf = controller.connectDevice(deviceId);
+ if (deviceNetconf != null) {
+ //storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
+ NetconfDeviceConfig config = cfgService.getConfig(deviceId, NetconfDeviceConfig.class);
+ //getting the device description
+ DeviceDescription deviceDescription = getDeviceDescription(deviceId, config);
+ //connecting device to ONOS
+ log.debug("Connected NETCONF device {}, on {}:{} with username {}",
+ deviceId, config.ip(), config.port(),
+ (config.path().isPresent() ? "/" + config.path().get() : ""),
+ config.username());
+ providerService.deviceConnected(deviceId, deviceDescription);
+ } else {
+ mastershipService.relinquishMastership(deviceId);
+ deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
+ log.error("Can't connect to NETCONF device {}", deviceId);
+ }
+ } catch (Exception e) {
+ mastershipService.relinquishMastership(deviceId);
+ deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
+ throw new IllegalStateException(new NetconfException(
+ "Can't connect to NETCONF device " + deviceId, e));
+
+ }
+
+ }
+
+ private DeviceDescription getDeviceDescription(DeviceId deviceId, NetconfDeviceConfig config) {
+ Driver driver = driverService.getDriver(deviceId);
+ if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
+ final DriverData data = new DefaultDriverData(driver, deviceId);
+ final DriverHandler handler = new DefaultDriverHandler(data);
+ //creating the behaviour because the core has yet no notion of device.
+ DeviceDescriptionDiscovery deviceDescriptionDiscovery =
+ driver.createBehaviour(handler, DeviceDescriptionDiscovery.class);
+ return getDeviceRepresentation(deviceId, config, deviceDescriptionDiscovery);
+ } else {
+ return emptyDescription(deviceId, config);
}
}
- private void checkAndUpdateDevice(DeviceId deviceId, DeviceDescription deviceDescription, boolean newlyConnected) {
+ private DeviceDescription emptyDescription(DeviceId deviceId, NetconfDeviceConfig config) {
+ ChassisId cid = new ChassisId();
+ String ipAddress = config.ip().toString();
+ DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
+ .set(IPADDRESS, ipAddress)
+ .set(PORT, String.valueOf(config.port()))
+ .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase());
+ if (config.path().isPresent()) {
+ annotations.set(PATH, config.path().get());
+ }
+ return new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
+ UNKNOWN, UNKNOWN, UNKNOWN, UNKNOWN, cid, true, annotations.build());
+ }
+
+ private DeviceDescription getDeviceRepresentation(DeviceId deviceId, NetconfDeviceConfig config,
+ DeviceDescriptionDiscovery deviceDescriptionDiscovery) {
+ Device device = deviceService.getDevice(deviceId);
+ //handling first time description creation
+ if (deviceService.getDevice(deviceId) == null) {
+ return emptyDescription(deviceId, config);
+ }
+ //getting the old description
+ DeviceDescription oldDescription = new DefaultDeviceDescription(device.id().uri(), device.type(),
+ device.manufacturer(), device.hwVersion(),
+ device.swVersion(), device.serialNumber(),
+ device.chassisId(), (SparseAnnotations) device.annotations());
+
+ DeviceDescription newDescription = deviceDescriptionDiscovery.discoverDeviceDetails();
+ if (newDescription == null) {
+ newDescription = oldDescription;
+ }
+ //merging and returning
+ return new DefaultDeviceDescription(newDescription, true,
+ DefaultAnnotations.merge((DefaultAnnotations) newDescription.annotations(),
+ oldDescription.annotations()));
+ }
+
+
+ private void discoverOrUpdatePorts(DeviceId deviceId) {
+ retriedPortDiscoveryMap.put(deviceId, new AtomicInteger(0));
+ AtomicInteger count = retriedPortDiscoveryMap.get(deviceId);
+ //TODO this does not enable port discovery if port changes.
Device device = deviceService.getDevice(deviceId);
if (device == null) {
- log.debug("Device {} has not been added to store, since it's not reachable", deviceId);
+ log.debug("Cant' reach device {}, not updating ports", deviceId);
return;
}
- boolean isReachable = isReachable(deviceId);
- if (!isReachable && deviceService.isAvailable(deviceId)) {
- providerService.deviceDisconnected(deviceId);
- return;
- } else if (newlyConnected && mastershipService.isLocalMaster(deviceId)) {
- updateDeviceDescription(deviceId, deviceDescription, device);
- }
- if (isReachable && deviceService.isAvailable(deviceId) &&
- mastershipService.isLocalMaster(deviceId)) {
- //if ports are not discovered, retry the discovery
- AtomicInteger count = retriedPortDiscoveryMap.get(deviceId);
- if (deviceService.getPorts(deviceId).isEmpty() &&
- count != null && count.getAndIncrement() < maxRetries) {
- discoverPorts(deviceId);
+ if (deviceService.getPorts(deviceId).isEmpty()
+ && count != null && count.getAndIncrement() < maxRetries) {
+ if (device.is(DeviceDescriptionDiscovery.class)) {
+ providerService.updatePorts(deviceId,
+ device.as(DeviceDescriptionDiscovery.class).discoverPortDetails());
+ } else {
+ log.warn("No DeviceDescirption behaviour for device {}", deviceId);
}
- updatePortStatistics(device);
- }
- }
- private void updateDeviceDescription(DeviceId deviceId, DeviceDescription deviceDescription, Device device) {
- if (device.is(DeviceDescriptionDiscovery.class)) {
- if (mastershipService.isLocalMaster(deviceId)) {
- DeviceDescriptionDiscovery deviceDescriptionDiscovery =
- device.as(DeviceDescriptionDiscovery.class);
- DeviceDescription updatedDeviceDescription =
- deviceDescriptionDiscovery.discoverDeviceDetails();
- if (updatedDeviceDescription != null &&
- !descriptionEquals(device, updatedDeviceDescription)) {
- providerService.deviceConnected(
- deviceId, new DefaultDeviceDescription(
- updatedDeviceDescription, true,
- updatedDeviceDescription.annotations()));
- } else if (updatedDeviceDescription == null) {
- providerService.deviceConnected(
- deviceId, new DefaultDeviceDescription(
- deviceDescription, true,
- deviceDescription.annotations()));
- }
- }
- } else {
- log.warn("No DeviceDescriptionDiscovery behaviour for device {} " +
- "using DefaultDeviceDescription", deviceId);
- providerService.deviceConnected(
- deviceId, new DefaultDeviceDescription(
- deviceDescription, true, deviceDescription.annotations()));
}
+ updatePortStatistics(device);
}
private void updatePortStatistics(Device device) {
@@ -504,129 +559,11 @@
Collection<PortStatistics> portStatistics = d.discoverPortStatistics();
if (portStatistics != null) {
providerService.updatePortStatistics(device.id(),
- portStatistics);
+ portStatistics);
}
} else {
log.debug("No port statistics getter behaviour for device {}",
- device.id());
- }
- }
-
- private boolean descriptionEquals(Device device, DeviceDescription updatedDeviceDescription) {
- return Objects.equal(device.id().uri(), updatedDeviceDescription.deviceUri())
- && Objects.equal(device.type(), updatedDeviceDescription.type())
- && Objects.equal(device.manufacturer(), updatedDeviceDescription.manufacturer())
- && Objects.equal(device.hwVersion(), updatedDeviceDescription.hwVersion())
- && Objects.equal(device.swVersion(), updatedDeviceDescription.swVersion())
- && Objects.equal(device.serialNumber(), updatedDeviceDescription.serialNumber())
- && Objects.equal(device.chassisId(), updatedDeviceDescription.chassisId())
- && Objects.equal(device.annotations(), updatedDeviceDescription.annotations());
- }
-
- private void checkAndUpdateDevices() {
- Set<DeviceId> deviceSubjects =
- cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
- deviceSubjects.forEach(deviceId -> {
- NetconfDeviceConfig config =
- cfgService.getConfig(deviceId, NetconfDeviceConfig.class);
- DeviceDescription deviceDescription = createDeviceRepresentation(deviceId, config);
- storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
- log.debug("check and update {}", deviceId);
- checkAndUpdateDevice(deviceId, deviceDescription, false);
- });
- }
-
- private DeviceDescription createDeviceRepresentation(DeviceId deviceId, NetconfDeviceConfig config) {
- Preconditions.checkNotNull(deviceId, ISNULL);
- //Netconf configuration object
- ChassisId cid = new ChassisId();
- String ipAddress = config.ip().toString();
- DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
- .set(IPADDRESS, ipAddress)
- .set(PORT, String.valueOf(config.port()))
- .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase())
- .set(AnnotationKeys.PROVIDER_MARK_ONLINE, String.valueOf(true));
- if (config.path().isPresent()) {
- annotations.set(PATH, config.path().get());
- }
-
- return new DefaultDeviceDescription(
- deviceId.uri(),
- Device.Type.SWITCH,
- UNKNOWN, UNKNOWN,
- UNKNOWN, UNKNOWN,
- cid, false,
- annotations.build());
- }
-
- private void storeDeviceKey(String sshKey, String username, String password, DeviceId deviceId) {
- if (sshKey.equals("")) {
- deviceKeyAdminService.addKey(
- DeviceKey.createDeviceKeyUsingUsernamePassword(
- DeviceKeyId.deviceKeyId(deviceId.toString()),
- null, username, password));
- } else {
- deviceKeyAdminService.addKey(
- DeviceKey.createDeviceKeyUsingSshKey(
- DeviceKeyId.deviceKeyId(deviceId.toString()),
- null, username, password,
- sshKey));
- }
- }
-
- private void initiateConnection(DeviceId deviceId, MastershipRole newRole) {
- try {
- if (isReachable(deviceId)) {
- NetconfDevice device = controller.connectDevice(deviceId);
- if (device != null) {
- providerService.receivedRoleReply(deviceId, newRole, MastershipRole.MASTER);
- } else {
- providerService.receivedRoleReply(deviceId, newRole, MastershipRole.NONE);
- }
-
- }
- } catch (Exception e) {
- if (deviceService.getDevice(deviceId) != null) {
- providerService.deviceDisconnected(deviceId);
- }
- deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
- throw new IllegalStateException(new NetconfException(
- "Can't connect to NETCONF device " + deviceId, e));
-
- }
- }
-
- private void discoverPorts(DeviceId deviceId) {
- Device device = deviceService.getDevice(deviceId);
- //TODO remove when PortDiscovery is removed from master
- if (device.is(DeviceDescriptionDiscovery.class)) {
- DeviceDescriptionDiscovery deviceDescriptionDiscovery =
- device.as(DeviceDescriptionDiscovery.class);
- providerService.updatePorts(deviceId,
- deviceDescriptionDiscovery.discoverPortDetails());
- } else {
- log.warn("No portGetter behaviour for device {}", deviceId);
- }
-
- // Port statistics discovery
- updatePortStatistics(device);
- }
-
- /**
- * Return the DeviceId about the device containing the URI.
- *
- * @param ip IP address
- * @param port port number
- * @param path path aspect
- * @return DeviceId
- */
- public DeviceId getDeviceId(String ip, int port, Optional<String> path) {
- try {
- return DeviceId.deviceId(new URI(NETCONF, ip + ":" + port +
- (path.isPresent() ? "/" + path : ""), null));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Unable to build deviceID for device "
- + ip + ":" + port + (path.isPresent() ? "/" + path : ""), e);
+ device.id());
}
}
@@ -655,13 +592,12 @@
@Override
public void event(NetworkConfigEvent event) {
- if (event.configClass().equals(NetconfDeviceConfig.class)) {
- executor.execute(() -> connectDevice((NetconfDeviceConfig) event.config().get()));
+ if (event.configClass().equals(NetconfDeviceConfig.class) && event.config().isPresent()) {
+ connectionExecutor.execute(exceptionSafe(() ->
+ runElectionFor((DeviceId) event.config().get().subject())));
} else {
- log.warn("Injecting device via this Json is deprecated, " +
- "please put configuration under devices/ as shown in the wiki");
+ log.warn("Incorrect or absent Class for Netconf Configuration");
}
-
}
@Override
@@ -673,28 +609,38 @@
}
/**
+ * Listener for Netconf Controller Events.
+ */
+ private class InnerNetconfDeviceListener implements NetconfDeviceListener {
+
+ @Override
+ public void deviceAdded(DeviceId deviceId) {
+ //no-op
+ log.debug("Netconf device {} added to Netconf controller", deviceId);
+ }
+
+ @Override
+ public void deviceRemoved(DeviceId deviceId) {
+ Preconditions.checkNotNull(deviceId, ISNULL);
+
+ if (deviceService.getDevice(deviceId) != null) {
+ providerService.deviceDisconnected(deviceId);
+ retriedPortDiscoveryMap.remove(deviceId);
+ log.debug("Netconf device {} removed from Netconf controller", deviceId);
+ } else {
+ log.warn("Netconf device {} does not exist in the store, " +
+ "it may already have been removed", deviceId);
+ }
+ }
+ }
+
+ /**
* Listener for core device events.
*/
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
- Device device = event.subject();
- DeviceId deviceId = device.id();
- if (event.type() == DeviceEvent.Type.DEVICE_ADDED && !deviceService.isAvailable(event.subject().id())) {
- try {
- DeviceDescription description = new DefaultDeviceDescription(deviceId.uri(), device.type(),
- device.manufacturer(), device.hwVersion(), device.swVersion(),
- device.serialNumber(), device.chassisId(),
- (SparseAnnotations) device.annotations());
- log.debug("check and update {}", deviceId);
- checkAndUpdateDevice(deviceId, description, true);
- } catch (Exception e) {
- log.error("Unhandled exception checking {}", deviceId, e);
- }
- }
- if (deviceService.isAvailable(event.subject().id())) {
- executor.execute(() -> discoverPorts(event.subject().id()));
- }
+ connectionExecutor.submit(exceptionSafe(() -> discoverOrUpdatePorts(event.subject().id())));
}
@Override
@@ -703,9 +649,6 @@
event.type() != DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED) {
return false;
}
- if (mastershipService.getMasterFor(event.subject().id()) == null) {
- return true;
- }
return (SCHEME_NAME.equalsIgnoreCase(event.subject().annotations().value(AnnotationKeys.PROTOCOL)) ||
(SCHEME_NAME.equalsIgnoreCase(event.subject().id().uri().getScheme()))) &&
mastershipService.isLocalMaster(event.subject().id());
diff --git a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
index 39498f3..da820a3 100644
--- a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
+++ b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
@@ -34,8 +34,10 @@
import org.onosproject.net.AbstractProjectableModel;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.Config;
import org.onosproject.net.config.ConfigApplyDelegate;
@@ -75,12 +77,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
@@ -210,7 +214,7 @@
assertTrue("Incorrect config factories", cfgFactories.contains(provider.factory));
assertNotNull("Device listener should be added", deviceService.listener);
assertFalse("Thread to connect device should be running",
- provider.executor.isShutdown() || provider.executor.isTerminated());
+ provider.connectionExecutor.isShutdown() || provider.connectionExecutor.isTerminated());
assertFalse("Scheduled task to update device should be running", provider.scheduledTask.isCancelled());
}
@@ -219,7 +223,7 @@
provider.deactivate();
assertNull("Device listener should be removed", deviceService.listener);
assertFalse("Provider should not be registered", deviceRegistry.getProviders().contains(provider.id()));
- assertTrue("Thread to connect device should be shutdown", provider.executor.isShutdown());
+ assertTrue("Thread to connect device should be shutdown", provider.connectionExecutor.isShutdown());
assertTrue("Scheduled task to update device should be shutdown", provider.scheduledTask.isCancelled());
assertNull("Provider service should be null", provider.providerService);
assertTrue("Network config factories not removed", cfgFactories.isEmpty());
@@ -270,7 +274,7 @@
@Test
public void testDiscoverPortsAfterDeviceAdded() {
- provider.executor = MoreExecutors.newDirectExecutorService();
+ provider.connectionExecutor = MoreExecutors.newDirectExecutorService();
prepareMocks(PORT_COUNT);
deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, netconfDevice));
@@ -287,6 +291,13 @@
}
}
+ private List<Port> createMockPorts(Collection<PortDescription> descs, DeviceId deviceId) {
+ Device device = deviceService.getDevice(deviceId);
+ return descs.stream()
+ .map(desc -> new DefaultPort(device, desc.portNumber(), desc.isEnabled(), desc.annotations()))
+ .collect(Collectors.toList());
+ }
+
//TODO: check updates of the device description
@@ -354,6 +365,11 @@
}
@Override
+ public List<Port> getPorts(DeviceId deviceId) {
+ return createMockPorts(providerService.ports.get(deviceId), deviceId);
+ }
+
+ @Override
public void addListener(DeviceListener listener) {
this.listener = listener;
}