ONOS-7920 - mutex around access to prevent reentrant creation

Change-Id: I9492b2733686e0da4c05f304883f958f92598c79
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java
index 930e7a5..3ef4e60 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceInfo.java
@@ -295,7 +295,8 @@
      */
     @Override
     public String toString() {
-        return "netconf:" + name + "@" + ipAddress + ":" + port;
+        return "netconf:" + name + "@" + ipAddress + ":" + port +
+            (path.isPresent() ? '/' + path.get() : "");
     }
 
     /**
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index 1ff38db..6af2517 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -60,6 +60,8 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.getIntegerProperty;
@@ -113,6 +115,8 @@
 
     private Map<DeviceId, NetconfDevice> netconfDeviceMap = new ConcurrentHashMap<>();
 
+    private Map<DeviceId, Lock> netconfCreateMutex = new ConcurrentHashMap<>();
+
     private final NetconfDeviceOutputEventListener downListener = new DeviceDownEventListener();
 
     protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
@@ -228,60 +232,86 @@
                 deviceId, NetconfDeviceConfig.class);
         NetconfDeviceInfo deviceInfo = null;
 
-        if (netconfDeviceMap.containsKey(deviceId)) {
-            log.debug("Device {} is already present", deviceId);
-            return netconfDeviceMap.get(deviceId);
-        } else if (netCfg != null) {
-            log.debug("Device {} is present in NetworkConfig", deviceId);
-            deviceInfo = new NetconfDeviceInfo(netCfg);
-        } else {
-            log.debug("Creating NETCONF device {}", deviceId);
-            Device device = deviceService.getDevice(deviceId);
-            String ip, path = null;
-            int port;
-            if (device != null) {
-                ip = device.annotations().value("ipaddress");
-                port = Integer.parseInt(device.annotations().value("port"));
-                path = device.annotations().value("path");
-            } else {
-                Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
-                ip = info.getLeft();
-                port = info.getMiddle();
-                path = (info.getRight().isPresent() ? info.getRight().get() : null);
-            }
-            try {
-                DeviceKey deviceKey = deviceKeyService.getDeviceKey(
-                        DeviceKeyId.deviceKeyId(deviceId.toString()));
-                if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
-                    UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
-
-                    deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
-                                                       usernamepasswd.password(),
-                                                       IpAddress.valueOf(ip),
-                                                       port,
-                                                       path);
-
-                } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
-                    String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
-                    String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
-                    String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
-
-                    deviceInfo = new NetconfDeviceInfo(username,
-                                                       password,
-                                                       IpAddress.valueOf(ip),
-                                                       port,
-                                                       path,
-                                                       sshkey);
-                } else {
-                    log.error("Unknown device key for device {}", deviceId);
-                }
-            } catch (NullPointerException e) {
-                throw new NetconfException("No Device Key for device " + deviceId, e);
+        /*
+         * A bit of an ugly race condition can be found here. It is possible
+         * that this method is called to create a connection to device A and
+         * while that device is in the process of being created another call
+         * to this method for A will be invoked. Since the first call to
+         * create A has not been completed device A is not in the the
+         * netconfDeviceMap yet.
+         *
+         * To prevent this situation a mutex is introduced so that the first
+         * call will be allowed to complete before the second is processed.
+         * The mutex is based on the device ID, so that it should be still
+         * possible to connect to different devices concurrently.
+         */
+        Lock mutex;
+        synchronized (netconfCreateMutex) {
+            mutex = netconfCreateMutex.get(deviceId);
+            if (mutex == null) {
+                mutex = new ReentrantLock();
+                netconfCreateMutex.put(deviceId, mutex);
             }
         }
-        NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
-        netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
-        return netconfDevicedevice;
+        mutex.lock();
+        try {
+            if (netconfDeviceMap.containsKey(deviceId)) {
+                log.debug("Device {} is already present", deviceId);
+                return netconfDeviceMap.get(deviceId);
+            } else if (netCfg != null) {
+                log.debug("Device {} is present in NetworkConfig", deviceId);
+                deviceInfo = new NetconfDeviceInfo(netCfg);
+            } else {
+                log.debug("Creating NETCONF device {}", deviceId);
+                Device device = deviceService.getDevice(deviceId);
+                String ip, path = null;
+                int port;
+                if (device != null) {
+                    ip = device.annotations().value("ipaddress");
+                    port = Integer.parseInt(device.annotations().value("port"));
+                    path = device.annotations().value("path");
+                } else {
+                    Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
+                    ip = info.getLeft();
+                    port = info.getMiddle();
+                    path = (info.getRight().isPresent() ? info.getRight().get() : null);
+                }
+                try {
+                    DeviceKey deviceKey = deviceKeyService.getDeviceKey(
+                            DeviceKeyId.deviceKeyId(deviceId.toString()));
+                    if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
+                        UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
+
+                        deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
+                                                           usernamepasswd.password(),
+                                                           IpAddress.valueOf(ip),
+                                                           port,
+                                                           path);
+
+                    } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
+                        String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
+                        String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
+                        String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+
+                        deviceInfo = new NetconfDeviceInfo(username,
+                                                           password,
+                                                           IpAddress.valueOf(ip),
+                                                           port,
+                                                           path,
+                                                           sshkey);
+                    } else {
+                        log.error("Unknown device key for device {}", deviceId);
+                    }
+                } catch (NullPointerException e) {
+                    throw new NetconfException("No Device Key for device " + deviceId, e);
+                }
+            }
+            NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
+            netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
+            return netconfDevicedevice;
+        } finally {
+            mutex.unlock();
+        }
     }
 
     @Override
@@ -294,7 +324,22 @@
     }
 
     private void stopDevice(DeviceId deviceId, boolean remove) {
-        NetconfDevice nc = netconfDeviceMap.remove(deviceId);
+        Lock mutex;
+        synchronized (netconfCreateMutex) {
+            mutex = netconfCreateMutex.remove(deviceId);
+        }
+        NetconfDevice nc;
+        if (mutex == null) {
+            log.warn("Unexpected stoping a device that has no lock");
+            nc = netconfDeviceMap.remove(deviceId);
+        } else {
+            mutex.lock();
+            try {
+                nc = netconfDeviceMap.remove(deviceId);
+            } finally {
+                mutex.unlock();
+            }
+        }
         if (nc != null) {
             nc.disconnect();
         }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 03b20e3..36c9ec1 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -498,6 +498,9 @@
     private void cleanUp() {
         //makes sure everything is at a clean state.
         replies.clear();
+        if (streamHandler != null) {
+            streamHandler.close();
+        }
     }
 
     @Override
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java
index 5fdbaae..cea726f 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamHandler.java
@@ -69,4 +69,9 @@
      *                             else, stops action based off notifications
      */
     void setEnableNotifications(boolean enableNotifications);
+
+    /**
+     * Closes the stream handler releasing any resources.
+     */
+    void close();
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
index ab639e9..c559a03 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
@@ -30,6 +30,7 @@
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
+import java.nio.channels.ClosedByInterruptException;
 import java.io.InputStreamReader;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -247,7 +248,7 @@
         try {
             boolean socketClosed = false;
             StringBuilder deviceReplyBuilder = new StringBuilder();
-            while (!socketClosed) {
+            while (!socketClosed && !this.isInterrupted()) {
                 int cInt = bufferReader.read();
                 if (cInt == -1) {
                     log.debug("Netconf device {}  sent error char in session," +
@@ -288,6 +289,8 @@
                     }
                 }
             }
+        } catch (ClosedByInterruptException i) {
+            log.debug("Connection to device {} was terminated on request", netconfDeviceInfo.toString());
         } catch (IOException e) {
             log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e);
             throw new IllegalStateException(new NetconfException("Error in reading from the session for device {}" +
@@ -296,6 +299,10 @@
         }
     }
 
+    public void close() {
+        close("on request");
+    }
+
     private void close(String deviceReply) {
         log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}",
                 netconfDeviceInfo, deviceReply);
@@ -403,4 +410,4 @@
     public void setEnableNotifications(boolean enableNotifications) {
         this.enableNotifications = enableNotifications;
     }
-}
\ No newline at end of file
+}