Netconf device provider to handle series of consecutive Mastership
requests properly
Change-Id: I1b81faae97b364aaae86128e629adff13f103dab
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 8281f4e..e54e90f 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -18,6 +18,7 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Striped;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -71,6 +72,7 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
@@ -88,6 +90,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
@@ -177,6 +181,7 @@
protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
private ApplicationId appId;
private boolean active;
+ private final Striped<Lock> deviceLocks = Striped.lock(30);
@Activate
@@ -271,7 +276,8 @@
if (active) {
switch (newRole) {
case MASTER:
- initiateConnection(deviceId, newRole);
+ withDeviceLock(
+ () -> initiateConnection(deviceId, newRole), deviceId).run();
log.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
break;
case STANDBY:
@@ -327,9 +333,13 @@
// method to test reachability.
//test connection to device opening a socket to it.
log.debug("Testing reachability for {}:{}", ip, port);
- try (Socket socket = new Socket(ip, port)) {
+ Socket socket = new Socket();
+ try {
+ socket.connect(new InetSocketAddress(ip, port), 1000);
log.debug("rechability of {}, {}, {}", deviceId, socket.isConnected(), !socket.isClosed());
- return socket.isConnected() && !socket.isClosed();
+ boolean isConnected = socket.isConnected() && !socket.isClosed();
+ socket.close();
+ return isConnected;
} catch (IOException e) {
log.info("Device {} is not reachable", deviceId);
log.debug(" error details", e);
@@ -426,6 +436,10 @@
log.trace("{} not my scheme, skipping", deviceId);
return;
}
+ if (!isReachable(deviceId)) {
+ log.warn("Can't connect to device {}", deviceId);
+ return;
+ }
DeviceDescription deviceDescription = createDeviceRepresentation(deviceId, config);
log.debug("Connecting NETCONF device {}, on {}:{} with username {}",
deviceId, config.ip(), config.port(), config.username());
@@ -434,11 +448,6 @@
if (deviceService.getDevice(deviceId) == null) {
providerService.deviceConnected(deviceId, deviceDescription);
}
- try {
- checkAndUpdateDevice(deviceId, deviceDescription, true);
- } catch (Exception e) {
- log.error("Unhandled exception checking {}", deviceId, e);
- }
}
private void checkAndUpdateDevice(DeviceId deviceId, DeviceDescription deviceDescription, boolean newlyConnected) {
@@ -451,7 +460,7 @@
if (!isReachable && deviceService.isAvailable(deviceId)) {
providerService.deviceDisconnected(deviceId);
return;
- } else if (newlyConnected) {
+ } else if (newlyConnected && mastershipService.isLocalMaster(deviceId)) {
updateDeviceDescription(deviceId, deviceDescription, device);
}
if (isReachable && deviceService.isAvailable(deviceId) &&
@@ -540,6 +549,7 @@
.set(IPADDRESS, ipAddress)
.set(PORT, String.valueOf(config.port()))
.set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase())
+ .set(AnnotationKeys.PROVIDER_MARK_ONLINE, String.valueOf(true))
.build();
return new DefaultDeviceDescription(
deviceId.uri(),
@@ -568,8 +578,13 @@
private void initiateConnection(DeviceId deviceId, MastershipRole newRole) {
try {
if (isReachable(deviceId)) {
- controller.connectDevice(deviceId);
- providerService.receivedRoleReply(deviceId, newRole, MastershipRole.MASTER);
+ NetconfDevice device = controller.connectDevice(deviceId);
+ if (device != null) {
+ providerService.receivedRoleReply(deviceId, newRole, MastershipRole.MASTER);
+ } else {
+ providerService.receivedRoleReply(deviceId, newRole, MastershipRole.NONE);
+ }
+
}
} catch (Exception e) {
if (deviceService.getDevice(deviceId) != null) {
@@ -614,6 +629,24 @@
}
}
+ private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+ final Lock lock = deviceLocks.get(deviceId);
+ lock.lock();
+ try {
+ return task.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
+ // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
+ return () -> withDeviceLock(() -> {
+ task.run();
+ return null;
+ }, deviceId);
+ }
+
/**
* Listener for configuration events.
*/
@@ -645,12 +678,23 @@
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
- executor.execute(() -> discoverPorts(event.subject().id()));
+ DeviceId deviceId = event.subject().id();
+ if (event.type() == DeviceEvent.Type.DEVICE_ADDED && !deviceService.isAvailable(event.subject().id())) {
+ try {
+ checkAndUpdateDevice(deviceId, null, true);
+ } catch (Exception e) {
+ log.error("Unhandled exception checking {}", deviceId, e);
+ }
+ }
+ if (deviceService.isAvailable(event.subject().id())) {
+ executor.execute(() -> discoverPorts(event.subject().id()));
+ }
}
@Override
public boolean isRelevant(DeviceEvent event) {
- if (event.type() != DeviceEvent.Type.DEVICE_ADDED) {
+ if (event.type() != DeviceEvent.Type.DEVICE_ADDED &&
+ event.type() != DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED) {
return false;
}
if (mastershipService.getMasterFor(event.subject().id()) == null) {
diff --git a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
index 53e7838..39498f3 100644
--- a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
+++ b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
@@ -263,12 +263,8 @@
assertNotNull(providerService);
assertTrue("Event should be relevant", provider.cfgListener.isRelevant(deviceAddedEvent));
available = true;
- provider.cfgListener.event(deviceAddedEvent);
-
- deviceAdded.await();
- assertEquals("Device should be added", 1, deviceStore.getDeviceCount());
- assertTrue("Device incorrectly added" + NETCONF_DEVICE_ID_STRING,
- devices.containsKey(DeviceId.deviceId(NETCONF_DEVICE_ID_STRING)));
+ assertFalse("Device should not be reachable" + NETCONF_DEVICE_ID_STRING,
+ provider.isReachable(DeviceId.deviceId(NETCONF_DEVICE_ID_STRING)));
devices.clear();
}
@@ -341,6 +337,11 @@
DeviceListener listener = null;
@Override
+ public boolean isAvailable(DeviceId deviceId) {
+ return true;
+ }
+
+ @Override
public Device getDevice(DeviceId deviceId) {
if (deviceId.toString().equals(NETCONF_DEVICE_ID_STRING)) {
return null;