Fixed some concurrency issues in NETCONF Session Added Unit Tests

Change-Id: I84fe0c17e3d757948a859f78d01fbb025397a44d
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 184a074..4d713f6 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -32,6 +32,7 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Matcher;
@@ -63,26 +64,41 @@
     private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
             = Lists.newCopyOnWriteArrayList();
     private boolean enableNotifications = true;
+    private Map<Integer, CompletableFuture<String>> replies;
 
     public NetconfStreamThread(final InputStream in, final OutputStream out,
                                final InputStream err, NetconfDeviceInfo deviceInfo,
-                               NetconfSessionDelegate delegate) {
+                               NetconfSessionDelegate delegate,
+                               Map<Integer, CompletableFuture<String>> replies) {
         this.in = in;
         this.err = err;
         outputStream = new PrintWriter(out);
         netconfDeviceInfo = deviceInfo;
         state = NetconfMessageState.NO_MATCHING_PATTERN;
         sessionDelegate = delegate;
+        this.replies = replies;
         log.debug("Stream thread for device {} session started", deviceInfo);
         start();
     }
 
     @Override
     public CompletableFuture<String> sendMessage(String request) {
+        Optional<Integer> messageId = getMsgId(request);
+        return sendMessage(request, messageId.get());
+    }
+
+    @Override
+    public CompletableFuture<String> sendMessage(String request, int messageId) {
         log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
-        outputStream.print(request);
-        outputStream.flush();
-        return new CompletableFuture<>();
+        CompletableFuture<String> cf = new CompletableFuture<>();
+        replies.put(messageId, cf);
+
+        synchronized (outputStream) {
+            outputStream.print(request);
+            outputStream.flush();
+        }
+
+        return cf;
     }
 
     public enum NetconfMessageState {
@@ -230,7 +246,7 @@
             }
     }
 
-    private static Optional<Integer> getMsgId(String reply) {
+    protected static Optional<Integer> getMsgId(String reply) {
         Matcher matcher = MSGID_PATTERN.matcher(reply);
         if (matcher.find()) {
             Integer messageId = Integer.parseInt(matcher.group(1));