ONOS-7920 - mutex around access to prevent reentrant creation
Change-Id: I9492b2733686e0da4c05f304883f958f92598c79
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java
index 930e7a5..3ef4e60 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java
@@ -295,7 +295,8 @@
*/
@Override
public String toString() {
- return "netconf:" + name + "@" + ipAddress + ":" + port;
+ return "netconf:" + name + "@" + ipAddress + ":" + port +
+ (path.isPresent() ? '/' + path.get() : "");
}
/**
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index 7afc841..d5f14e0 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -61,6 +61,8 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.getIntegerProperty;
@@ -118,6 +120,8 @@
private Map<DeviceId, NetconfDevice> netconfDeviceMap = new ConcurrentHashMap<>();
+ private Map<DeviceId, Lock> netconfCreateMutex = new ConcurrentHashMap<>();
+
private final NetconfDeviceOutputEventListener downListener = new DeviceDownEventListener();
protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
@@ -231,60 +235,86 @@
deviceId, NetconfDeviceConfig.class);
NetconfDeviceInfo deviceInfo = null;
- if (netconfDeviceMap.containsKey(deviceId)) {
- log.debug("Device {} is already present", deviceId);
- return netconfDeviceMap.get(deviceId);
- } else if (netCfg != null) {
- log.debug("Device {} is present in NetworkConfig", deviceId);
- deviceInfo = new NetconfDeviceInfo(netCfg);
- } else {
- log.debug("Creating NETCONF device {}", deviceId);
- Device device = deviceService.getDevice(deviceId);
- String ip, path = null;
- int port;
- if (device != null) {
- ip = device.annotations().value("ipaddress");
- port = Integer.parseInt(device.annotations().value("port"));
- path = device.annotations().value("path");
- } else {
- Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
- ip = info.getLeft();
- port = info.getMiddle();
- path = (info.getRight().isPresent() ? info.getRight().get() : null);
- }
- try {
- DeviceKey deviceKey = deviceKeyService.getDeviceKey(
- DeviceKeyId.deviceKeyId(deviceId.toString()));
- if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
- UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
-
- deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
- usernamepasswd.password(),
- IpAddress.valueOf(ip),
- port,
- path);
-
- } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
- String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
- String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
- String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
-
- deviceInfo = new NetconfDeviceInfo(username,
- password,
- IpAddress.valueOf(ip),
- port,
- path,
- sshkey);
- } else {
- log.error("Unknown device key for device {}", deviceId);
- }
- } catch (NullPointerException e) {
- throw new NetconfException("No Device Key for device " + deviceId, e);
+ /*
+ * A bit of an ugly race condition can be found here. It is possible
+ * that this method is called to create a connection to device A and
+ * while that device is in the process of being created another call
+ * to this method for A will be invoked. Since the first call to
+ * create A has not been completed device A is not in the the
+ * netconfDeviceMap yet.
+ *
+ * To prevent this situation a mutex is introduced so that the first
+ * call will be allowed to complete before the second is processed.
+ * The mutex is based on the device ID, so that it should be still
+ * possible to connect to different devices concurrently.
+ */
+ Lock mutex;
+ synchronized (netconfCreateMutex) {
+ mutex = netconfCreateMutex.get(deviceId);
+ if (mutex == null) {
+ mutex = new ReentrantLock();
+ netconfCreateMutex.put(deviceId, mutex);
}
}
- NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
- netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
- return netconfDevicedevice;
+ mutex.lock();
+ try {
+ if (netconfDeviceMap.containsKey(deviceId)) {
+ log.debug("Device {} is already present", deviceId);
+ return netconfDeviceMap.get(deviceId);
+ } else if (netCfg != null) {
+ log.debug("Device {} is present in NetworkConfig", deviceId);
+ deviceInfo = new NetconfDeviceInfo(netCfg);
+ } else {
+ log.debug("Creating NETCONF device {}", deviceId);
+ Device device = deviceService.getDevice(deviceId);
+ String ip, path = null;
+ int port;
+ if (device != null) {
+ ip = device.annotations().value("ipaddress");
+ port = Integer.parseInt(device.annotations().value("port"));
+ path = device.annotations().value("path");
+ } else {
+ Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
+ ip = info.getLeft();
+ port = info.getMiddle();
+ path = (info.getRight().isPresent() ? info.getRight().get() : null);
+ }
+ try {
+ DeviceKey deviceKey = deviceKeyService.getDeviceKey(
+ DeviceKeyId.deviceKeyId(deviceId.toString()));
+ if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
+ UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
+
+ deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
+ usernamepasswd.password(),
+ IpAddress.valueOf(ip),
+ port,
+ path);
+
+ } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
+ String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
+ String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
+ String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+
+ deviceInfo = new NetconfDeviceInfo(username,
+ password,
+ IpAddress.valueOf(ip),
+ port,
+ path,
+ sshkey);
+ } else {
+ log.error("Unknown device key for device {}", deviceId);
+ }
+ } catch (NullPointerException e) {
+ throw new NetconfException("No Device Key for device " + deviceId, e);
+ }
+ }
+ NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
+ netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
+ return netconfDevicedevice;
+ } finally {
+ mutex.unlock();
+ }
}
@Override
@@ -297,7 +327,22 @@
}
private void stopDevice(DeviceId deviceId, boolean remove) {
- NetconfDevice nc = netconfDeviceMap.remove(deviceId);
+ Lock mutex;
+ synchronized (netconfCreateMutex) {
+ mutex = netconfCreateMutex.remove(deviceId);
+ }
+ NetconfDevice nc;
+ if (mutex == null) {
+ log.warn("Unexpected stoping a device that has no lock");
+ nc = netconfDeviceMap.remove(deviceId);
+ } else {
+ mutex.lock();
+ try {
+ nc = netconfDeviceMap.remove(deviceId);
+ } finally {
+ mutex.unlock();
+ }
+ }
if (nc != null) {
nc.disconnect();
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 0834a7e..c5af629 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -440,6 +440,9 @@
private void cleanUp() {
//makes sure everything is at a clean state.
replies.clear();
+ if (streamHandler != null) {
+ streamHandler.close();
+ }
}
@Override
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java
index 5fdbaae..cea726f 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java
@@ -69,4 +69,9 @@
* else, stops action based off notifications
*/
void setEnableNotifications(boolean enableNotifications);
+
+ /**
+ * Closes the stream handler releasing any resources.
+ */
+ void close();
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
index 343db0d..0c57810 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
@@ -30,6 +30,7 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
+import java.nio.channels.ClosedByInterruptException;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -247,7 +248,7 @@
try {
boolean socketClosed = false;
StringBuilder deviceReplyBuilder = new StringBuilder();
- while (!socketClosed) {
+ while (!socketClosed && !this.isInterrupted()) {
int cInt = bufferReader.read();
if (cInt == -1) {
log.debug("Netconf device {} sent error char in session," +
@@ -288,6 +289,8 @@
}
}
}
+ } catch (ClosedByInterruptException i) {
+ log.debug("Connection to device {} was terminated on request", netconfDeviceInfo.toString());
} catch (IOException e) {
log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e);
throw new IllegalStateException(new NetconfException("Error in reading from the session for device {}" +
@@ -296,6 +299,10 @@
}
}
+ public void close() {
+ close("on request");
+ }
+
private void close(String deviceReply) {
log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}",
netconfDeviceInfo, deviceReply);
@@ -403,4 +410,4 @@
public void setEnableNotifications(boolean enableNotifications) {
this.enableNotifications = enableNotifications;
}
-}
\ No newline at end of file
+}