ONOS-3839 Fixing errors in sending requests and hanging on future.join

Change-Id: I6da5bf1ff728efeb0d531cf7f04f6bf49f11a0a9
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 6169c3d..9376667 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -49,17 +49,20 @@
     private static final String RPC_ERROR = "rpc-error";
     private static final String NOTIFICATION_LABEL = "<notification>";
 
-    private static PrintWriter outputStream;
-    private static NetconfDeviceInfo netconfDeviceInfo;
-    private static NetconfSessionDelegate sessionDelegate;
-    private static NetconfMessageState state;
-    private static List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
+    private PrintWriter outputStream;
+    private final InputStream err;
+    private final InputStream in;
+    private NetconfDeviceInfo netconfDeviceInfo;
+    private NetconfSessionDelegate sessionDelegate;
+    private NetconfMessageState state;
+    private  List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
             = Lists.newArrayList();
 
     public NetconfStreamThread(final InputStream in, final OutputStream out,
                                final InputStream err, NetconfDeviceInfo deviceInfo,
                                NetconfSessionDelegate delegate) {
-        super(handler(in, err));
+        this.in = in;
+        this.err = err;
         outputStream = new PrintWriter(out);
         netconfDeviceInfo = deviceInfo;
         state = NetconfMessageState.NO_MATCHING_PATTERN;
@@ -70,6 +73,7 @@
 
     @Override
     public CompletableFuture<String> sendMessage(String request) {
+        log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
         outputStream.print(request);
         outputStream.flush();
         return new CompletableFuture<>();
@@ -147,9 +151,8 @@
         abstract NetconfMessageState evaluateChar(char c);
     }
 
-    private static Runnable handler(final InputStream in, final InputStream err) {
+    public void run() {
         BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
-        return () -> {
             try {
                 boolean socketClosed = false;
                 StringBuilder deviceReplyBuilder = new StringBuilder();
@@ -157,6 +160,7 @@
                     int cInt = bufferReader.read();
                     if (cInt == -1) {
                         socketClosed = true;
+                        log.debug("char {} " + bufferReader.read());
                         NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
                                 NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
                                 null, null, -1, netconfDeviceInfo);
@@ -167,27 +171,36 @@
                     state = state.evaluateChar(c);
                     deviceReplyBuilder.append(c);
                     if (state == NetconfMessageState.END_PATTERN) {
-                        String deviceReply = deviceReplyBuilder.toString()
-                                .replace(END_PATTERN, "");
-                        if (deviceReply.contains(RPC_REPLY) ||
-                                deviceReply.contains(RPC_ERROR) ||
-                                deviceReply.contains(HELLO)) {
+                        String deviceReply = deviceReplyBuilder.toString();
+                        if (deviceReply.equals(END_PATTERN)) {
                             NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
-                                    NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
-                                    null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
-                            sessionDelegate.notify(event);
+                                    NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
+                                    null, null, -1, netconfDeviceInfo);
                             netconfDeviceEventListeners.forEach(
                                     listener -> listener.event(event));
-                        } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
-                            final String finalDeviceReply = deviceReply;
-                            netconfDeviceEventListeners.forEach(
-                                    listener -> listener.event(new NetconfDeviceOutputEvent(
-                                            NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
-                                            null, finalDeviceReply, getMsgId(finalDeviceReply), netconfDeviceInfo)));
                         } else {
-                            log.info("Error on replay from device {} ", deviceReply);
+                            deviceReply = deviceReply.replace(END_PATTERN, "");
+                            if (deviceReply.contains(RPC_REPLY) ||
+                                    deviceReply.contains(RPC_ERROR) ||
+                                    deviceReply.contains(HELLO)) {
+                                NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                        NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
+                                        null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
+                                sessionDelegate.notify(event);
+                                netconfDeviceEventListeners.forEach(
+                                        listener -> listener.event(event));
+                            } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
+                                final String finalDeviceReply = deviceReply;
+                                netconfDeviceEventListeners.forEach(
+                                        listener -> listener.event(new NetconfDeviceOutputEvent(
+                                                NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
+                                                null, finalDeviceReply, getMsgId(finalDeviceReply),
+                                                netconfDeviceInfo)));
+                            } else {
+                                log.info("Error on replay from device {} ", deviceReply);
+                            }
+                            deviceReplyBuilder.setLength(0);
                         }
-                        deviceReplyBuilder.setLength(0);
                     }
                 }
             } catch (IOException e) {
@@ -196,7 +209,6 @@
                                                                         netconfDeviceInfo, e));
                 //TODO should we send a socket closed message to listeners ?
             }
-        };
     }
 
     private static int getMsgId(String reply) {