Fixed some concurrency issues in NETCONF Session Added Unit Tests

Change-Id: I84fe0c17e3d757948a859f78d01fbb025397a44d
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 26ded2f..07f7c98 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -31,15 +31,17 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
 
 
 /**
@@ -82,6 +84,8 @@
     private static final String SUBSCRIPTION_SUBTREE_FILTER_OPEN =
             "<filter xmlns:base10=\"urn:ietf:params:xml:ns:netconf:base:1.0\" base10:type=\"subtree\">";
 
+    private static Pattern msgIdPattern = Pattern.compile("(message-id=\"[0-9]+\")");
+
     private final AtomicInteger messageIdInteger = new AtomicInteger(0);
     private Connection netconfConnection;
     private NetconfDeviceInfo deviceInfo;
@@ -101,7 +105,7 @@
         this.netconfConnection = null;
         this.sshSession = null;
         connectionActive = false;
-        replies = new HashMap<>();
+        replies = new ConcurrentHashMap<>();
         errorReplies = new ArrayList<>();
         startConnection();
     }
@@ -158,7 +162,8 @@
             sshSession.startSubSystem("netconf");
             streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
                                                     sshSession.getStderr(), deviceInfo,
-                                                    new NetconfSessionDelegateImpl());
+                                                    new NetconfSessionDelegateImpl(),
+                                                    replies);
             this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
             sendHello();
         } catch (IOException e) {
@@ -276,22 +281,26 @@
     }
 
     @Override
+    @Deprecated
     public CompletableFuture<String> request(String request) {
-        CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
-        replies.put(messageIdInteger.get(), ftrep);
-        return ftrep;
+        return streamHandler.sendMessage(request);
+    }
+
+    private CompletableFuture<String> request(String request, int messageId) {
+        return streamHandler.sendMessage(request, messageId);
     }
 
     private String sendRequest(String request) throws NetconfException {
         checkAndRestablishSession();
-        request = formatRequestMessageId(request);
+        final int messageId = messageIdInteger.getAndIncrement();
+        request = formatRequestMessageId(request, messageId);
         request = formatXmlHeader(request);
-        CompletableFuture<String> futureReply = request(request);
-        messageIdInteger.incrementAndGet();
+        CompletableFuture<String> futureReply = request(request, messageId);
         int replyTimeout = NetconfControllerImpl.netconfReplyTimeout;
         String rp;
         try {
             rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
+            replies.remove(messageId);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             throw new NetconfException("No matching reply for request " + request, e);
         }
@@ -299,15 +308,15 @@
         return rp.trim();
     }
 
-    private String formatRequestMessageId(String request) {
+    private String formatRequestMessageId(String request, int messageId) {
         if (request.contains(MESSAGE_ID_STRING)) {
-            //FIXME if application provieds his own counting of messages this fails that count
+            //FIXME if application provides his own counting of messages this fails that count
             request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
-                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageIdInteger.get() + "\"");
+                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
         } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
             //FIXME find out a better way to enforce the presence of message-id
             request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
-                    + messageIdInteger.get() + "\"" + ">");
+                    + messageId + "\"" + ">");
         }
         return request;
     }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
index 1ee7911..98ebf3f 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -29,12 +29,25 @@
     /**
      * Sends the request on the stream that is used to communicate to and from the device.
      *
+     * If this request does not contain a messageId then this will throw a NoSuchElementException
+     *
      * @param request request to send to the physical device
      * @return a CompletableFuture of type String that will contain the response for the request.
+     * @deprecated - use method with messageId parameter instead
      */
+    @Deprecated
     CompletableFuture<String> sendMessage(String request);
 
     /**
+     * Sends the request on the stream that is used to communicate to and from the device.
+     *
+     * @param request request to send to the physical device
+     * @param messageId The identifier of the message - should be unique for the session
+     * @return a CompletableFuture of type String that will contain the response for the request.
+     */
+    CompletableFuture<String> sendMessage(String request, int messageId);
+
+    /**
      * Adds a listener for netconf events on the handled stream.
      *
      * @param listener Netconf device event listener
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 184a074..4d713f6 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -32,6 +32,7 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Matcher;
@@ -63,26 +64,41 @@
     private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
             = Lists.newCopyOnWriteArrayList();
     private boolean enableNotifications = true;
+    private Map<Integer, CompletableFuture<String>> replies;
 
     public NetconfStreamThread(final InputStream in, final OutputStream out,
                                final InputStream err, NetconfDeviceInfo deviceInfo,
-                               NetconfSessionDelegate delegate) {
+                               NetconfSessionDelegate delegate,
+                               Map<Integer, CompletableFuture<String>> replies) {
         this.in = in;
         this.err = err;
         outputStream = new PrintWriter(out);
         netconfDeviceInfo = deviceInfo;
         state = NetconfMessageState.NO_MATCHING_PATTERN;
         sessionDelegate = delegate;
+        this.replies = replies;
         log.debug("Stream thread for device {} session started", deviceInfo);
         start();
     }
 
     @Override
     public CompletableFuture<String> sendMessage(String request) {
+        Optional<Integer> messageId = getMsgId(request);
+        return sendMessage(request, messageId.get());
+    }
+
+    @Override
+    public CompletableFuture<String> sendMessage(String request, int messageId) {
         log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
-        outputStream.print(request);
-        outputStream.flush();
-        return new CompletableFuture<>();
+        CompletableFuture<String> cf = new CompletableFuture<>();
+        replies.put(messageId, cf);
+
+        synchronized (outputStream) {
+            outputStream.print(request);
+            outputStream.flush();
+        }
+
+        return cf;
     }
 
     public enum NetconfMessageState {
@@ -230,7 +246,7 @@
             }
     }
 
-    private static Optional<Integer> getMsgId(String reply) {
+    protected static Optional<Integer> getMsgId(String reply) {
         Matcher matcher = MSGID_PATTERN.matcher(reply);
         if (matcher.find()) {
             Integer messageId = Integer.parseInt(matcher.group(1));