[ONOS-7917] Improving device description query in Netconf Device provider
Change-Id: I10ac650e09083fa81e1a54cc8f8e39537dc16cbb
cherry pick changes for netconf connections
Change-Id: I615305a6eee3c291a6d8b31f452fe18116348763
hand touch up on cherry picks and merges for netconfig connections
Change-Id: I35551f569ff272e28a1d39aa27e04a587d5b95d6
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index d5f14e0..8a169b9 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -29,6 +29,7 @@
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
@@ -115,6 +116,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry netCfgService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
public static final Logger log = LoggerFactory
.getLogger(NetconfControllerImpl.class);
@@ -320,7 +324,7 @@
@Override
public void disconnectDevice(DeviceId deviceId, boolean remove) {
if (!netconfDeviceMap.containsKey(deviceId)) {
- log.warn("Device {} is not present", deviceId);
+ log.debug("Device {} is not present", deviceId);
} else {
stopDevice(deviceId, remove);
}
@@ -408,7 +412,8 @@
@Override
public void event(NetconfDeviceOutputEvent event) {
DeviceId did = event.getDeviceInfo().getDeviceId();
- if (event.type().equals(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED)) {
+ if (event.type().equals(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED) ||
+ !mastershipService.isLocalMaster(did)) {
removeDevice(did);
} else if (event.type().equals(NetconfDeviceOutputEvent.Type.SESSION_CLOSED)) {
log.info("Trying to reestablish connection with device {}", did);
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index c5af629..23932e4 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -35,6 +35,7 @@
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.osgi.ServiceDirectory;
+import org.onlab.util.ItemNotFoundException;
import org.onlab.util.SharedExecutors;
import org.onosproject.net.DeviceId;
import org.onosproject.net.driver.Driver;
@@ -185,17 +186,22 @@
public Set<String> getClientCapabilites(DeviceId deviceId) {
Set<String> capabilities = new LinkedHashSet<>();
DriverService driverService = directory.get(DriverService.class);
- Driver driver = driverService.getDriver(deviceId);
- if (driver == null) {
+ try {
+ Driver driver = driverService.getDriver(deviceId);
+ if (driver == null) {
+ return capabilities;
+ }
+ String clientCapabilities = driver.getProperty(NETCONF_CLIENT_CAPABILITY);
+ if (clientCapabilities == null) {
+ return capabilities;
+ }
+ String[] textStr = clientCapabilities.split("\\|");
+ capabilities.addAll(Arrays.asList(textStr));
+ return capabilities;
+ } catch (ItemNotFoundException e) {
+ log.warn("Driver for device {} currently not available", deviceId);
return capabilities;
}
- String clientCapabilities = driver.getProperty(NETCONF_CLIENT_CAPABILITY);
- if (clientCapabilities == null) {
- return capabilities;
- }
- String[] textStr = clientCapabilities.split("\\|");
- capabilities.addAll(Arrays.asList(textStr));
- return capabilities;
}
private void startConnection() throws NetconfException {
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
index 0062572..dbfd7cf 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
@@ -26,6 +26,8 @@
import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.Config;
import org.onosproject.net.config.ConfigApplyDelegate;
@@ -114,6 +116,7 @@
private static DeviceService deviceService = new NetconfDeviceServiceMock();
private static DeviceKeyService deviceKeyService = new NetconfDeviceKeyServiceMock();
private final NetworkConfigRegistry netCfgService = new MockNetworkConfigRegistry();
+ private final MastershipService mastershipService = new MockmastershipService();
private final ComponentContext context = new MockComponentContext();
@@ -125,6 +128,7 @@
ctrl.deviceService = deviceService;
ctrl.deviceKeyService = deviceKeyService;
ctrl.netCfgService = netCfgService;
+ ctrl.mastershipService = mastershipService;
NetconfControllerImpl.netconfConnectTimeout = NetconfControllerImpl.DEFAULT_CONNECT_TIMEOUT_SECONDS;
NetconfControllerImpl.netconfIdleTimeout = NetconfControllerImpl.DEFAULT_IDLE_TIMEOUT_SECONDS;
NetconfControllerImpl.netconfReplyTimeout = NetconfControllerImpl.DEFAULT_REPLY_TIMEOUT_SECONDS;
@@ -533,4 +537,11 @@
public void onApply(Config configFile) {
}
}
+
+ private class MockmastershipService extends MastershipServiceAdapter {
+ @Override
+ public boolean isLocalMaster(DeviceId deviceId) {
+ return true;
+ }
+ }
}
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index e60c8e5..6e017ab 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -16,7 +16,6 @@
package org.onosproject.provider.netconf.device.impl;
-import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.felix.scr.annotations.Activate;
@@ -26,10 +25,10 @@
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import com.google.common.util.concurrent.Striped;
import org.onlab.packet.ChassisId;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
@@ -58,6 +57,12 @@
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.device.PortStatisticsDiscovery;
+import org.onosproject.net.driver.DefaultDriverData;
+import org.onosproject.net.driver.DefaultDriverHandler;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.driver.DriverData;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
import org.onosproject.net.key.DeviceKey;
import org.onosproject.net.key.DeviceKeyAdminService;
import org.onosproject.net.key.DeviceKeyId;
@@ -72,9 +77,8 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Dictionary;
import java.util.Map;
@@ -82,12 +86,15 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
@@ -119,6 +126,9 @@
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DriverService driverService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceKeyAdminService deviceKeyAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -134,7 +144,6 @@
private static final String UNKNOWN = "unknown";
protected static final String ISNULL = "NetconfDeviceInfo is null";
private static final String IPADDRESS = "ipaddress";
- private static final String NETCONF = "netconf";
private static final String PORT = "port";
private static final String PATH = "path";
private static final int CORE_POOL_SIZE = 10;
@@ -151,19 +160,17 @@
"default is 5 times")
private int maxRetries = DEFAULT_MAX_RETRIES;
- protected ExecutorService executor =
- Executors.newFixedThreadPool(5, groupedThreads("onos/netconfdeviceprovider",
- "device-installer-%d", log));
- protected ScheduledExecutorService connectionExecutor
- = newScheduledThreadPool(CORE_POOL_SIZE,
- groupedThreads("onos/netconfdeviceprovider",
- "connection-executor-%d", log));
+ protected ExecutorService connectionExecutor = Executors.newFixedThreadPool(CORE_POOL_SIZE,
+ groupedThreads("onos/netconfDeviceProviderConnection",
+ "connection-executor-%d", log));
+ protected ScheduledExecutorService pollingExecutor = newScheduledThreadPool(CORE_POOL_SIZE,
+ groupedThreads("onos/netconfDeviceProviderPoll",
+ "polling-executor-%d", log));
protected DeviceProviderService providerService;
- private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();
- private InternalDeviceListener deviceListener = new InternalDeviceListener();
private final Map<DeviceId, AtomicInteger> retriedPortDiscoveryMap = new ConcurrentHashMap<>();
protected ScheduledFuture<?> scheduledTask;
+ private final Striped<Lock> deviceLocks = Striped.lock(30);
protected final ConfigFactory factory =
// TODO consider moving Config registration to NETCONF ctl bundle
@@ -177,7 +184,8 @@
};
protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
- private ApplicationId appId;
+ private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();
+ private InternalDeviceListener deviceListener = new InternalDeviceListener();
private boolean active;
@@ -186,12 +194,12 @@
active = true;
componentConfigService.registerProperties(getClass());
providerService = providerRegistry.register(this);
- appId = coreService.registerApplication(APP_NAME);
+ coreService.registerApplication(APP_NAME);
cfgService.registerConfigFactory(factory);
cfgService.addListener(cfgListener);
controller.addDeviceListener(innerNodeListener);
deviceService.addListener(deviceListener);
- executor.execute(NetconfDeviceProvider.this::connectDevices);
+ pollingExecutor.execute(NetconfDeviceProvider.this::connectDevices);
modified(context);
log.info("Started");
}
@@ -199,6 +207,7 @@
@Deactivate
public void deactivate() {
+ cfgService.removeListener(cfgListener);
componentConfigService.unregisterProperties(getClass(), false);
deviceService.removeListener(deviceListener);
active = false;
@@ -207,13 +216,13 @@
controller.disconnectDevice(id, true);
});
controller.removeDeviceListener(innerNodeListener);
- deviceService.removeListener(deviceListener);
providerRegistry.unregister(this);
providerService = null;
retriedPortDiscoveryMap.clear();
cfgService.unregisterConfigFactory(factory);
scheduledTask.cancel(true);
- executor.shutdown();
+ connectionExecutor.shutdown();
+ pollingExecutor.shutdown();
log.info("Stopped");
}
@@ -240,43 +249,26 @@
super(new ProviderId(SCHEME_NAME, DEVICE_PROVIDER_PACKAGE));
}
- // Checks connection to devices in the config file
- // every DEFAULT_POLL_FREQUENCY_SECONDS seconds.
- private ScheduledFuture schedulePolling() {
- return connectionExecutor.scheduleAtFixedRate(exceptionSafe(this::checkAndUpdateDevices),
- pollFrequency / 10,
- pollFrequency, TimeUnit.SECONDS);
- }
-
- private Runnable exceptionSafe(Runnable runnable) {
- return new Runnable() {
-
- @Override
- public void run() {
- try {
- runnable.run();
- } catch (Exception e) {
- log.error("Unhandled Exception", e);
- }
- }
- };
- }
-
@Override
public void triggerProbe(DeviceId deviceId) {
- // TODO: This will be implemented later.
- log.debug("Should be triggering probe on device {}", deviceId);
+ //Not implemented, unused in netconf cases.
+ log.debug("Probing {} not implemented, not useful for NETCONF", deviceId);
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+ log.debug("Request role change {}, {}", deviceId, newRole);
if (active) {
switch (newRole) {
case MASTER:
- initiateConnection(deviceId, newRole);
- log.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
+ if (controller.getNetconfDevice(deviceId) == null) {
+ connectionExecutor.execute(exceptionSafe(() -> withDeviceLock(
+ () -> initiateConnection(deviceId), deviceId).run()));
+ log.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
+ }
break;
case STANDBY:
+ //TODO this issue a warning on the first election/connection
controller.disconnectDevice(deviceId, false);
providerService.receivedRoleReply(deviceId, newRole, MastershipRole.STANDBY);
//else no-op
@@ -294,11 +286,10 @@
@Override
public boolean isReachable(DeviceId deviceId) {
-
boolean sessionExists =
Optional.ofNullable(controller.getDevicesMap().get(deviceId))
- .map(NetconfDevice::isActive)
- .orElse(false);
+ .map(NetconfDevice::isActive)
+ .orElse(false);
if (sessionExists) {
return true;
}
@@ -320,9 +311,13 @@
// method to test reachability.
//test connection to device opening a socket to it.
log.debug("Testing reachability for {}:{}", ip, port);
- try (Socket socket = new Socket(ip, port)) {
+ Socket socket = new Socket();
+ try {
+ socket.connect(new InetSocketAddress(ip, port), 1000);
log.debug("rechability of {}, {}, {}", deviceId, socket.isConnected(), !socket.isClosed());
- return socket.isConnected() && !socket.isClosed();
+ boolean isConnected = socket.isConnected() && !socket.isClosed();
+ socket.close();
+ return isConnected;
} catch (IOException e) {
log.info("Device {} is not reachable", deviceId);
log.debug(" error details", e);
@@ -331,43 +326,45 @@
}
@Override
- public void changePortState(DeviceId deviceId, PortNumber portNumber,
- boolean enable) {
+ public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
Device device = deviceService.getDevice(deviceId);
- if (mastershipService.isLocalMaster(deviceId)) {
- if (device.is(PortAdmin.class)) {
- PortAdmin portAdmin =
- device.as(PortAdmin.class);
- CompletableFuture<Boolean> modified;
- if (enable) {
- modified = portAdmin.enable(portNumber);
- } else {
- modified = portAdmin.disable(portNumber);
- }
- modified.thenAccept(result -> {
- if (result) {
- Port port = deviceService.getPort(deviceId, portNumber);
- //rebuilding port description with admin state changed.
- providerService.portStatusChanged(deviceId,
- DefaultPortDescription.builder()
- .withPortNumber(portNumber)
- .isEnabled(enable)
- .isRemoved(false)
- .type(port.type())
- .portSpeed(port.portSpeed())
- .annotations((SparseAnnotations) port.annotations())
- .build());
- } else {
- log.warn("Your device {} port {} status can't be changed to {}",
- deviceId, portNumber, enable);
- }
- });
- } else {
- log.warn("Device {} does not support Port Admin", deviceId);
- }
- } else {
- log.debug("Not master but {}, not changing port state", mastershipService.getLocalRole(deviceId));
+ if (device == null) {
+ log.error("Device {} is not present in the store", deviceId);
+ return;
}
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ log.info("Not master but {}, not changing port state", mastershipService.getLocalRole(deviceId));
+ return;
+ }
+ if (!device.is(PortAdmin.class)) {
+ log.warn("Device {} does not support Port Admin", deviceId);
+ return;
+ }
+ PortAdmin portAdmin = device.as(PortAdmin.class);
+ CompletableFuture<Boolean> modified;
+ if (enable) {
+ modified = portAdmin.enable(portNumber);
+ } else {
+ modified = portAdmin.disable(portNumber);
+ }
+ modified.thenAccept(result -> {
+ if (result) {
+ Port port = deviceService.getPort(deviceId, portNumber);
+ //rebuilding port description with admin state changed.
+ providerService.portStatusChanged(deviceId,
+ DefaultPortDescription.builder()
+ .withPortNumber(portNumber)
+ .isEnabled(enable)
+ .isRemoved(false)
+ .type(port.type())
+ .portSpeed(port.portSpeed())
+ .annotations((SparseAnnotations) port.annotations())
+ .build());
+ } else {
+ log.warn("Your device {} port {} status can't be changed to {}",
+ deviceId, portNumber, enable);
+ }
+ });
}
@Override
@@ -376,163 +373,147 @@
controller.disconnectDevice(deviceId, true);
}
- private class InnerNetconfDeviceListener implements NetconfDeviceListener {
-
-
- @Override
- public void deviceAdded(DeviceId deviceId) {
- //no-op
- log.debug("Netconf device {} added to Netconf subController", deviceId);
- }
-
- @Override
- public void deviceRemoved(DeviceId deviceId) {
- Preconditions.checkNotNull(deviceId, ISNULL);
-
- if (deviceService.getDevice(deviceId) != null) {
- providerService.deviceDisconnected(deviceId);
- retriedPortDiscoveryMap.remove(deviceId);
- log.debug("Netconf device {} removed from Netconf subController", deviceId);
- } else {
- log.warn("Netconf device {} does not exist in the store, " +
- "it may already have been removed", deviceId);
- }
- }
+ private ScheduledFuture schedulePolling() {
+ return pollingExecutor.scheduleAtFixedRate(exceptionSafe(this::checkAndUpdateDevices),
+ pollFrequency / 10,
+ pollFrequency, TimeUnit.SECONDS);
}
+ private Runnable exceptionSafe(Runnable runnable) {
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ log.error("Unhandled Exception", e);
+ }
+ };
+ }
+
+ //Connecting devices with initial config
private void connectDevices() {
- Set<DeviceId> deviceSubjects =
- cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
+ Set<DeviceId> deviceSubjects = cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
deviceSubjects.forEach(deviceId -> {
- connectDevice(cfgService.getConfig(deviceId, NetconfDeviceConfig.class));
+ connectionExecutor.execute(exceptionSafe(() -> runElectionFor(deviceId)));
});
}
+ //updating keys and device info
+ private void checkAndUpdateDevices() {
+ Set<DeviceId> deviceSubjects = cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
+ deviceSubjects.forEach(deviceId -> {
+ log.debug("check and update {}", deviceId);
+ NetconfDeviceConfig config = cfgService.getConfig(deviceId, NetconfDeviceConfig.class);
+ storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
+ discoverOrUpdatePorts(deviceId);
+ });
+ }
- private void connectDevice(NetconfDeviceConfig config) {
- if (config == null) {
- log.warn("connect device invoked with null config");
- return;
+ //Saving device keys in the store
+ private void storeDeviceKey(String sshKey, String username, String password, DeviceId deviceId) {
+ if (sshKey.equals("")) {
+ deviceKeyAdminService.addKey(
+ DeviceKey.createDeviceKeyUsingUsernamePassword(
+ DeviceKeyId.deviceKeyId(deviceId.toString()), null, username, password));
+ } else {
+ deviceKeyAdminService.addKey(
+ DeviceKey.createDeviceKeyUsingSshKey(
+ DeviceKeyId.deviceKeyId(deviceId.toString()), null, username, password, sshKey));
}
- DeviceId deviceId = config.subject();
+ }
+
+ //running an election and applying the role to a given device
+ private void runElectionFor(DeviceId deviceId) {
+ //Triggering an election for the deviceId thus only master will connect
if (!deviceId.uri().getScheme().equals(SCHEME_NAME)) {
// not under my scheme, skipping
log.debug("{} not of schema {}, skipping", deviceId, SCHEME_NAME);
return;
}
- DeviceDescription deviceDescription = createDeviceRepresentation(deviceId, config);
- log.debug("Connecting NETCONF device {}, on {}:{}{} with username {}",
- deviceId, config.ip(), config.port(),
- (config.path().isPresent() ? "/" + config.path().get() : ""),
- config.username());
- storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
- retriedPortDiscoveryMap.put(deviceId, new AtomicInteger(0));
- if (deviceService.getDevice(deviceId) == null) {
- log.debug("device connected {}", deviceId);
- providerService.deviceConnected(deviceId, deviceDescription);
+ connectionExecutor.submit(exceptionSafe(() -> {
+ CompletableFuture<MastershipRole> role = mastershipService.requestRoleFor(deviceId);
+ try {
+ roleChanged(deviceId, role.get());
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Can't get role for {} ", deviceId, e);
+ }
+ }));
+ }
+
+ //initiating the SSh connection the a given device.
+ private void initiateConnection(DeviceId deviceId) {
+
+ if (!isReachable(deviceId)) {
+ log.warn("Can't connect to device {}", deviceId);
+ return;
}
+
try {
- log.debug("check and update {}", deviceId);
- checkAndUpdateDevice(deviceId, deviceDescription, true);
+ NetconfDevice deviceNetconf = controller.connectDevice(deviceId);
+ if (deviceNetconf != null) {
+ //storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
+ NetconfDeviceConfig config = cfgService.getConfig(deviceId, NetconfDeviceConfig.class);
+ //getting the device description
+ DeviceDescription deviceDescription = getDeviceDescription(deviceId, config);
+ //connecting device to ONOS
+ log.debug("Connected NETCONF device {}, on {}:{} with username {}",
+ deviceId, config.ip(), config.port(),
+ (config.path().isPresent() ? "/" + config.path().get() : ""),
+ config.username());
+ providerService.deviceConnected(deviceId, deviceDescription);
+ } else {
+ mastershipService.relinquishMastership(deviceId);
+ deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
+ log.error("Can't connect to NETCONF device {}", deviceId);
+ }
} catch (Exception e) {
- log.error("Unhandled exception checking {}", deviceId, e);
+ mastershipService.relinquishMastership(deviceId);
+ deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
+ throw new IllegalStateException(new NetconfException(
+ "Can't connect to NETCONF device " + deviceId, e));
+
+ }
+
+ }
+
+ private DeviceDescription getDeviceDescription(DeviceId deviceId, NetconfDeviceConfig config) {
+ Driver driver = driverService.getDriver(deviceId);
+ if (driver.hasBehaviour(DeviceDescriptionDiscovery.class)) {
+ final DriverData data = new DefaultDriverData(driver, deviceId);
+ final DriverHandler handler = new DefaultDriverHandler(data);
+ //creating the behaviour because the core has yet no notion of device.
+ DeviceDescriptionDiscovery deviceDescriptionDiscovery =
+ driver.createBehaviour(handler, DeviceDescriptionDiscovery.class);
+ return getDeviceRepresentation(deviceId, config, deviceDescriptionDiscovery);
+ } else {
+ return existingOrEmptyDescription(deviceId, config);
}
}
- private void checkAndUpdateDevice(DeviceId deviceId, DeviceDescription deviceDescription, boolean newlyConnected) {
+ private DeviceDescription getDeviceRepresentation(DeviceId deviceId, NetconfDeviceConfig config,
+ DeviceDescriptionDiscovery deviceDescriptionDiscovery) {
+
+ DeviceDescription existingOrEmptyDescription = existingOrEmptyDescription(deviceId, config);
+ DeviceDescription newDescription = deviceDescriptionDiscovery.discoverDeviceDetails();
+ if (newDescription == null) {
+ return existingOrEmptyDescription;
+ }
+ //merging and returning
+ return new DefaultDeviceDescription(newDescription, true,
+ DefaultAnnotations.merge((DefaultAnnotations) newDescription.annotations(),
+ existingOrEmptyDescription.annotations()));
+ }
+
+ private DeviceDescription existingOrEmptyDescription(DeviceId deviceId, NetconfDeviceConfig config) {
Device device = deviceService.getDevice(deviceId);
- if (device == null) {
- log.debug("Device {} has not been added to store, since it's not reachable", deviceId);
- return;
- }
- boolean isReachable = isReachable(deviceId);
- if (!isReachable && deviceService.isAvailable(deviceId)) {
- providerService.deviceDisconnected(deviceId);
- return;
- } else if (newlyConnected) {
- updateDeviceDescription(deviceId, deviceDescription, device);
- }
- if (isReachable && deviceService.isAvailable(deviceId) &&
- mastershipService.isLocalMaster(deviceId)) {
- //if ports are not discovered, retry the discovery
- AtomicInteger count = retriedPortDiscoveryMap.get(deviceId);
- if (deviceService.getPorts(deviceId).isEmpty() &&
- count != null && count.getAndIncrement() < maxRetries) {
- discoverPorts(deviceId);
- }
- updatePortStatistics(device);
- }
- }
- private void updateDeviceDescription(DeviceId deviceId, DeviceDescription deviceDescription, Device device) {
- if (device.is(DeviceDescriptionDiscovery.class)) {
- if (mastershipService.isLocalMaster(deviceId)) {
- DeviceDescriptionDiscovery deviceDescriptionDiscovery =
- device.as(DeviceDescriptionDiscovery.class);
- DeviceDescription updatedDeviceDescription =
- deviceDescriptionDiscovery.discoverDeviceDetails();
- if (updatedDeviceDescription != null &&
- !descriptionEquals(device, updatedDeviceDescription)) {
- providerService.deviceConnected(
- deviceId, new DefaultDeviceDescription(
- updatedDeviceDescription, true,
- updatedDeviceDescription.annotations()));
- } else if (updatedDeviceDescription == null) {
- providerService.deviceConnected(
- deviceId, new DefaultDeviceDescription(
- deviceDescription, true,
- deviceDescription.annotations()));
- }
- }
- } else {
- log.warn("No DeviceDescriptionDiscovery behaviour for device {} " +
- "using DefaultDeviceDescription", deviceId);
- providerService.deviceConnected(
- deviceId, new DefaultDeviceDescription(
- deviceDescription, true, deviceDescription.annotations()));
+ if (deviceService.getDevice(deviceId) != null) {
+ //getting the previous description
+ return new DefaultDeviceDescription(device.id().uri(), device.type(),
+ device.manufacturer(), device.hwVersion(),
+ device.swVersion(), device.serialNumber(),
+ device.chassisId(), (SparseAnnotations) device.annotations());
}
- }
- private void updatePortStatistics(Device device) {
- if (device.is(PortStatisticsDiscovery.class)) {
- PortStatisticsDiscovery d = device.as(PortStatisticsDiscovery.class);
- Collection<PortStatistics> portStatistics = d.discoverPortStatistics();
- if (portStatistics != null) {
- providerService.updatePortStatistics(device.id(),
- portStatistics);
- }
- } else {
- log.debug("No port statistics getter behaviour for device {}",
- device.id());
- }
- }
-
- private boolean descriptionEquals(Device device, DeviceDescription updatedDeviceDescription) {
- return Objects.equal(device.id().uri(), updatedDeviceDescription.deviceUri())
- && Objects.equal(device.type(), updatedDeviceDescription.type())
- && Objects.equal(device.manufacturer(), updatedDeviceDescription.manufacturer())
- && Objects.equal(device.hwVersion(), updatedDeviceDescription.hwVersion())
- && Objects.equal(device.swVersion(), updatedDeviceDescription.swVersion())
- && Objects.equal(device.serialNumber(), updatedDeviceDescription.serialNumber())
- && Objects.equal(device.chassisId(), updatedDeviceDescription.chassisId())
- && Objects.equal(device.annotations(), updatedDeviceDescription.annotations());
- }
-
- private void checkAndUpdateDevices() {
- Set<DeviceId> deviceSubjects =
- cfgService.getSubjects(DeviceId.class, NetconfDeviceConfig.class);
- deviceSubjects.forEach(deviceId -> {
- NetconfDeviceConfig config =
- cfgService.getConfig(deviceId, NetconfDeviceConfig.class);
- DeviceDescription deviceDescription = createDeviceRepresentation(deviceId, config);
- storeDeviceKey(config.sshKey(), config.username(), config.password(), deviceId);
- checkAndUpdateDevice(deviceId, deviceDescription, false);
- });
- }
-
- private DeviceDescription createDeviceRepresentation(DeviceId deviceId, NetconfDeviceConfig config) {
- Preconditions.checkNotNull(deviceId, ISNULL);
- //Netconf configuration object
ChassisId cid = new ChassisId();
String ipAddress = config.ip().toString();
DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
@@ -542,82 +523,65 @@
if (config.path().isPresent()) {
annotations.set(PATH, config.path().get());
}
-
- return new DefaultDeviceDescription(
- deviceId.uri(),
- Device.Type.SWITCH,
- UNKNOWN, UNKNOWN,
- UNKNOWN, UNKNOWN,
- cid, false,
- annotations.build());
+ return new DefaultDeviceDescription(deviceId.uri(), Device.Type.SWITCH,
+ UNKNOWN, UNKNOWN, UNKNOWN, UNKNOWN, cid, true, annotations.build());
}
- private void storeDeviceKey(String sshKey, String username, String password, DeviceId deviceId) {
- if (sshKey.equals("")) {
- deviceKeyAdminService.addKey(
- DeviceKey.createDeviceKeyUsingUsernamePassword(
- DeviceKeyId.deviceKeyId(deviceId.toString()),
- null, username, password));
- } else {
- deviceKeyAdminService.addKey(
- DeviceKey.createDeviceKeyUsingSshKey(
- DeviceKeyId.deviceKeyId(deviceId.toString()),
- null, username, password,
- sshKey));
- }
- }
- private void initiateConnection(DeviceId deviceId, MastershipRole newRole) {
- try {
- if (isReachable(deviceId)) {
- controller.connectDevice(deviceId);
- providerService.receivedRoleReply(deviceId, newRole, MastershipRole.MASTER);
- }
- } catch (Exception e) {
- if (deviceService.getDevice(deviceId) != null) {
- providerService.deviceDisconnected(deviceId);
- }
- deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
- throw new IllegalStateException(new NetconfException(
- "Can't connect to NETCONF device " + deviceId, e));
-
- }
- }
-
- private void discoverPorts(DeviceId deviceId) {
+ private void discoverOrUpdatePorts(DeviceId deviceId) {
+ retriedPortDiscoveryMap.put(deviceId, new AtomicInteger(0));
+ AtomicInteger count = retriedPortDiscoveryMap.get(deviceId);
+ //TODO this does not enable port discovery if port changes.
Device device = deviceService.getDevice(deviceId);
- //TODO remove when PortDiscovery is removed from master
- if (device.is(DeviceDescriptionDiscovery.class)) {
- DeviceDescriptionDiscovery deviceDescriptionDiscovery =
- device.as(DeviceDescriptionDiscovery.class);
- providerService.updatePorts(deviceId,
- deviceDescriptionDiscovery.discoverPortDetails());
- } else {
- log.warn("No portGetter behaviour for device {}", deviceId);
+ if (device == null) {
+ log.debug("Cant' reach device {}, not updating ports", deviceId);
+ return;
}
+ if (deviceService.getPorts(deviceId).isEmpty()
+ && count != null && count.getAndIncrement() < maxRetries) {
+ if (device.is(DeviceDescriptionDiscovery.class)) {
+ providerService.updatePorts(deviceId,
+ device.as(DeviceDescriptionDiscovery.class).discoverPortDetails());
+ } else {
+ log.warn("No DeviceDescirption behaviour for device {}", deviceId);
+ }
- // Port statistics discovery
+ }
updatePortStatistics(device);
}
- /**
- * Return the DeviceId about the device containing the URI.
- *
- * @param ip IP address
- * @param port port number
- * @param path path aspect
- * @return DeviceId
- */
- public DeviceId getDeviceId(String ip, int port, Optional<String> path) {
- try {
- return DeviceId.deviceId(new URI(NETCONF, ip + ":" + port +
- (path.isPresent() ? "/" + path : ""), null));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Unable to build deviceID for device "
- + ip + ":" + port + (path.isPresent() ? "/" + path : ""), e);
+ private void updatePortStatistics(Device device) {
+ if (device.is(PortStatisticsDiscovery.class)) {
+ PortStatisticsDiscovery d = device.as(PortStatisticsDiscovery.class);
+ Collection<PortStatistics> portStatistics = d.discoverPortStatistics();
+ if (portStatistics != null) {
+ providerService.updatePortStatistics(device.id(),
+ portStatistics);
+ }
+ } else {
+ log.debug("No port statistics getter behaviour for device {}",
+ device.id());
}
}
+ private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+ final Lock lock = deviceLocks.get(deviceId);
+ lock.lock();
+ try {
+ return task.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
+ // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
+ return () -> withDeviceLock(() -> {
+ task.run();
+ return null;
+ }, deviceId);
+ }
+
/**
* Listener for configuration events.
*/
@@ -625,13 +589,12 @@
@Override
public void event(NetworkConfigEvent event) {
- if (event.configClass().equals(NetconfDeviceConfig.class)) {
- executor.execute(() -> connectDevice((NetconfDeviceConfig) event.config().get()));
+ if (event.configClass().equals(NetconfDeviceConfig.class) && event.config().isPresent()) {
+ connectionExecutor.execute(exceptionSafe(() ->
+ runElectionFor((DeviceId) event.config().get().subject())));
} else {
- log.warn("Injecting device via this Json is deprecated, " +
- "please put configuration under devices/ as shown in the wiki");
+ log.warn("Incorrect or absent Class for Netconf Configuration");
}
-
}
@Override
@@ -643,22 +606,46 @@
}
/**
+ * Listener for Netconf Controller Events.
+ */
+ private class InnerNetconfDeviceListener implements NetconfDeviceListener {
+
+ @Override
+ public void deviceAdded(DeviceId deviceId) {
+ //no-op
+ log.debug("Netconf device {} added to Netconf controller", deviceId);
+ }
+
+ @Override
+ public void deviceRemoved(DeviceId deviceId) {
+ Preconditions.checkNotNull(deviceId, ISNULL);
+
+ if (deviceService.getDevice(deviceId) != null) {
+ providerService.deviceDisconnected(deviceId);
+ retriedPortDiscoveryMap.remove(deviceId);
+ log.debug("Netconf device {} removed from Netconf controller", deviceId);
+ } else {
+ log.warn("Netconf device {} does not exist in the store, " +
+ "it may already have been removed", deviceId);
+ }
+ }
+ }
+
+ /**
* Listener for core device events.
*/
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
- executor.execute(() -> discoverPorts(event.subject().id()));
+ connectionExecutor.submit(exceptionSafe(() -> discoverOrUpdatePorts(event.subject().id())));
}
@Override
public boolean isRelevant(DeviceEvent event) {
- if (event.type() != DeviceEvent.Type.DEVICE_ADDED) {
+ if (event.type() != DeviceEvent.Type.DEVICE_ADDED &&
+ event.type() != DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED) {
return false;
}
- if (mastershipService.getMasterFor(event.subject().id()) == null) {
- return true;
- }
return (SCHEME_NAME.equalsIgnoreCase(event.subject().annotations().value(AnnotationKeys.PROTOCOL)) ||
(SCHEME_NAME.equalsIgnoreCase(event.subject().id().uri().getScheme()))) &&
mastershipService.isLocalMaster(event.subject().id());
diff --git a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
index 53e7838..0d0894c 100644
--- a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
+++ b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
@@ -34,8 +34,10 @@
import org.onosproject.net.AbstractProjectableModel;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.Config;
import org.onosproject.net.config.ConfigApplyDelegate;
@@ -75,12 +77,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
@@ -210,7 +214,7 @@
assertTrue("Incorrect config factories", cfgFactories.contains(provider.factory));
assertNotNull("Device listener should be added", deviceService.listener);
assertFalse("Thread to connect device should be running",
- provider.executor.isShutdown() || provider.executor.isTerminated());
+ provider.connectionExecutor.isShutdown() || provider.connectionExecutor.isTerminated());
assertFalse("Scheduled task to update device should be running", provider.scheduledTask.isCancelled());
}
@@ -219,7 +223,7 @@
provider.deactivate();
assertNull("Device listener should be removed", deviceService.listener);
assertFalse("Provider should not be registered", deviceRegistry.getProviders().contains(provider.id()));
- assertTrue("Thread to connect device should be shutdown", provider.executor.isShutdown());
+ assertTrue("Thread to connect device should be shutdown", provider.connectionExecutor.isShutdown());
assertTrue("Scheduled task to update device should be shutdown", provider.scheduledTask.isCancelled());
assertNull("Provider service should be null", provider.providerService);
assertTrue("Network config factories not removed", cfgFactories.isEmpty());
@@ -263,18 +267,14 @@
assertNotNull(providerService);
assertTrue("Event should be relevant", provider.cfgListener.isRelevant(deviceAddedEvent));
available = true;
- provider.cfgListener.event(deviceAddedEvent);
-
- deviceAdded.await();
- assertEquals("Device should be added", 1, deviceStore.getDeviceCount());
- assertTrue("Device incorrectly added" + NETCONF_DEVICE_ID_STRING,
- devices.containsKey(DeviceId.deviceId(NETCONF_DEVICE_ID_STRING)));
+ assertFalse("Device should not be reachable" + NETCONF_DEVICE_ID_STRING,
+ provider.isReachable(DeviceId.deviceId(NETCONF_DEVICE_ID_STRING)));
devices.clear();
}
@Test
public void testDiscoverPortsAfterDeviceAdded() {
- provider.executor = MoreExecutors.newDirectExecutorService();
+ provider.connectionExecutor = MoreExecutors.newDirectExecutorService();
prepareMocks(PORT_COUNT);
deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, netconfDevice));
@@ -291,6 +291,13 @@
}
}
+ private List<Port> createMockPorts(Collection<PortDescription> descs, DeviceId deviceId) {
+ Device device = deviceService.getDevice(deviceId);
+ return descs.stream()
+ .map(desc -> new DefaultPort(device, desc.portNumber(), desc.isEnabled(), desc.annotations()))
+ .collect(Collectors.toList());
+ }
+
//TODO: check updates of the device description
@@ -353,6 +360,11 @@
}
@Override
+ public List<Port> getPorts(DeviceId deviceId) {
+ return createMockPorts(providerService.ports.get(deviceId), deviceId);
+ }
+
+ @Override
public void addListener(DeviceListener listener) {
this.listener = listener;
}