ONOS-3605 Create thread Session input stream mechanism, adding listener for events from the device

Change-Id: Ib323487f61d9e595f7ccdc1957a92e58b7002d2a
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
index 7477238..cdcfbf8 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
@@ -26,11 +26,11 @@
 import org.onosproject.netconf.NetconfDevice;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceListener;
+import org.onosproject.netconf.NetconfException;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -90,9 +90,9 @@
     }
 
     @Override
-    public NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws IOException {
+    public NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
         if (netconfDeviceMap.containsKey(deviceInfo.getDeviceId())) {
-            log.warn("Device {} is already present", deviceInfo);
+            log.info("Device {} is already present", deviceInfo);
             return netconfDeviceMap.get(deviceInfo.getDeviceId());
         } else {
             log.info("Creating NETCONF device {}", deviceInfo);
@@ -109,9 +109,8 @@
         }
     }
 
-    private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws IOException {
-        NetconfDevice netconfDevice = null;
-        netconfDevice = new NetconfDeviceImpl(deviceInfo);
+    private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
+        NetconfDevice netconfDevice = new NetconfDeviceImpl(deviceInfo);
         for (NetconfDeviceListener l : netconfDeviceListeners) {
             l.deviceAdded(deviceInfo);
         }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java
index 762c21c..5a8f499 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java
@@ -18,6 +18,7 @@
 
 import org.onosproject.netconf.NetconfDevice;
 import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,12 +37,13 @@
     private boolean deviceState = false;
     private NetconfSession netconfSession;
 
-    public NetconfDeviceImpl(NetconfDeviceInfo deviceInfo) throws IOException {
+    public NetconfDeviceImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
         netconfDeviceInfo = deviceInfo;
         try {
             netconfSession = new NetconfSessionImpl(netconfDeviceInfo);
         } catch (IOException e) {
-            throw new IOException("Cannot create connection and session", e);
+            throw new NetconfException("Cannot create connection and session for device " +
+                                               deviceInfo, e);
         }
         deviceState = true;
     }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
new file mode 100644
index 0000000..ce0c235
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Example of a listener for events that happen a Netconf session established
+ * for a particular NETCONF device.
+ */
+public class NetconfDeviceOutputEventListenerImpl implements NetconfDeviceOutputEventListener {
+
+    private static final Logger log =
+            LoggerFactory.getLogger(NetconfDeviceOutputEventListenerImpl.class);
+
+    private NetconfDeviceInfo deviceInfo;
+
+    public NetconfDeviceOutputEventListenerImpl(NetconfDeviceInfo deviceInfo) {
+        this.deviceInfo = deviceInfo;
+    }
+
+    @Override
+    public void event(NetconfDeviceOutputEvent event) {
+        switch (event.type()) {
+            case DEVICE_REPLY:
+                log.debug("Device {} has reply: {}", deviceInfo, event.getMessagePayload());
+                break;
+            case DEVICE_NOTIFICATION:
+                log.info("Device {} has notification: {}", deviceInfo, event.getMessagePayload());
+                break;
+            case DEVICE_UNREGISTERED:
+                log.warn("Device {} has closed session", deviceInfo);
+                //TODO tell onos about closed session
+                break;
+            case DEVICE_ERROR:
+                log.warn("Device {} has error: {}", deviceInfo, event.getMessagePayload());
+                break;
+            default:
+                log.warn("Wrong event type {} ", event.type());
+        }
+
+    }
+
+    @Override
+    public boolean isRelevant(NetconfDeviceOutputEvent event) {
+        return deviceInfo.equals(event.getDeviceInfo());
+    }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java
new file mode 100644
index 0000000..1b526db
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+
+/**
+ * Entity associated with a NetconfSessionImpl and capable of receiving notifications of
+ * events about the session.
+ */
+public interface NetconfSessionDelegate {
+
+        /**
+         * Notifies the delegate via the specified event.
+         *
+         * @param event store generated event
+         */
+        void notify(NetconfDeviceOutputEvent event);
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 795211e..7f544dc 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -18,56 +18,70 @@
 
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.StreamGobbler;
 import com.google.common.base.Preconditions;
 import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
 
 /**
  * Implementation of a NETCONF session to talk to a device.
  */
 public class NetconfSessionImpl implements NetconfSession {
 
-    public static final Logger log = LoggerFactory
+    private static final Logger log = LoggerFactory
             .getLogger(NetconfSessionImpl.class);
+
+
     private static final int CONNECTION_TIMEOUT = 0;
+    private static final String ENDPATTERN = "]]>]]>";
+    private static final AtomicInteger MESSAGE_ID_INTEGER = new AtomicInteger(0);
+    private static final String MESSAGE_ID_STRING = "message-id";
+    private static final String HELLO = "hello";
+    private static final String NEW_LINE = "\n";
 
 
     private Connection netconfConnection;
     private NetconfDeviceInfo deviceInfo;
     private Session sshSession;
     private boolean connectionActive;
-    private BufferedReader bufferReader = null;
     private PrintWriter out = null;
-    private int messageID = 0;
-    //TODO inject these capabilites from yang model provided by app
     private List<String> deviceCapabilities =
             Collections.singletonList("urn:ietf:params:netconf:base:1.0");
     private String serverCapabilities;
-    private String endpattern = "]]>]]>";
+    private NetconfStreamHandler t;
+    private Map<Integer, CompletableFuture<String>> replies;
 
 
-    public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws IOException {
+    public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
         this.deviceInfo = deviceInfo;
         connectionActive = false;
+        replies = new HashMap<>();
         startConnection();
     }
 
 
-    private void startConnection() throws IOException {
+    private void startConnection() throws NetconfException {
         if (!connectionActive) {
             netconfConnection = new Connection(deviceInfo.ip().toString(), deviceInfo.port());
-            netconfConnection.connect(null, CONNECTION_TIMEOUT, 0);
+            try {
+                netconfConnection.connect(null, CONNECTION_TIMEOUT, 5000);
+            } catch (IOException e) {
+                throw new NetconfException("Cannot open a connection with device" + deviceInfo, e);
+            }
             boolean isAuthenticated;
             try {
                 if (deviceInfo.getKeyFile() != null) {
@@ -75,39 +89,49 @@
                             deviceInfo.name(), deviceInfo.getKeyFile(),
                             deviceInfo.password());
                 } else {
-                    log.info("authenticate with username {} and password {}",
-                             deviceInfo.name(), deviceInfo.password());
+                    log.debug("Authenticating to device {} with username {}",
+                              deviceInfo.getDeviceId(), deviceInfo.name(), deviceInfo.password());
                     isAuthenticated = netconfConnection.authenticateWithPassword(
                             deviceInfo.name(), deviceInfo.password());
                 }
             } catch (IOException e) {
-                throw new IOException("Authentication connection failed:" +
-                                              e.getMessage());
+                log.error("Authentication connection to device " +
+                                  deviceInfo.getDeviceId() + " failed:" +
+                                  e.getMessage());
+                throw new NetconfException("Authentication connection to device " +
+                                                   deviceInfo.getDeviceId() + " failed", e);
             }
 
             connectionActive = true;
             Preconditions.checkArgument(isAuthenticated,
-                                        "Authentication password and username failed");
+                                        "Authentication to device {} with username " +
+                                                "{} Failed",
+                                        deviceInfo.getDeviceId(), deviceInfo.name(),
+                                        deviceInfo.password());
             startSshSession();
         }
     }
 
-    private void startSshSession() throws IOException {
+    private void startSshSession() throws NetconfException {
         try {
             sshSession = netconfConnection.openSession();
             sshSession.startSubSystem("netconf");
-            bufferReader = new BufferedReader(new InputStreamReader(new StreamGobbler(
-                    sshSession.getStdout())));
             out = new PrintWriter(sshSession.getStdin());
+            t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+                                        sshSession.getStderr(), deviceInfo,
+                                        new NetconfSessionDelegateImpl());
+            this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
             sendHello();
         } catch (IOException e) {
-            throw new IOException("Failed to create ch.ethz.ssh2.Session session:" +
-                                          e.getMessage());
+            log.error("Failed to create ch.ethz.ssh2.Session session:" +
+                              e.getMessage());
+            throw new NetconfException("Failed to create ch.ethz.ssh2.Session session with device" +
+                                               deviceInfo, e);
         }
     }
 
     private void sendHello() throws IOException {
-        serverCapabilities = doRequest(createHelloString());
+        serverCapabilities = sendRequest(createHelloString());
     }
 
     private String createHelloString() {
@@ -119,58 +143,68 @@
                 cap -> hellobuffer.append("    <capability>" + cap + "</capability>\n"));
         hellobuffer.append("  </capabilities>\n");
         hellobuffer.append("</hello>\n");
-        hellobuffer.append(endpattern);
+        hellobuffer.append(ENDPATTERN);
         return hellobuffer.toString();
 
     }
 
-    @Override
-    public String doRPC(String request) throws IOException {
-        String reply = doRequest(request + "\n" + endpattern);
-        return checkReply(reply) ? reply : "ERROR " + reply;
-    }
-
-    private String doRequest(String request) throws IOException {
-        //log.info("sshState " + sshSession.getState() + "request" + request);
-        checkAndRestablishSession();
-        //log.info("sshState after" + sshSession.getState());
-        out.print(request);
-        out.flush();
-        messageID++;
-        return readOne();
-    }
-
-    private void checkAndRestablishSession() throws IOException {
+    private void checkAndRestablishSession() throws NetconfException {
         if (sshSession.getState() != 2) {
             try {
                 startSshSession();
             } catch (IOException e) {
-                log.info("the connection had to be reopened");
+                log.debug("The connection with {} had to be reopened", deviceInfo.getDeviceId());
                 try {
                     startConnection();
                 } catch (IOException e2) {
                     log.error("No connection {} for device, exception {}", netconfConnection, e2);
-                    throw new IOException(e.getMessage());
-                    //TODO remove device from ONOS
+                    throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
                 }
             }
         }
     }
 
     @Override
-    public String get(String request) throws IOException {
-        return doRPC(request);
+    public String requestSync(String request) throws NetconfException {
+        String reply = sendRequest(request + NEW_LINE + ENDPATTERN);
+        return checkReply(reply) ? reply : "ERROR " + reply;
     }
 
     @Override
-    public String getConfig(String targetConfiguration) throws IOException {
+    public CompletableFuture<String> request(String request) {
+        CompletableFuture<String> ftrep = t.sendMessage(request);
+        replies.put(MESSAGE_ID_INTEGER.get(), ftrep);
+        return ftrep;
+    }
+
+    private String sendRequest(String request) throws NetconfException {
+        checkAndRestablishSession();
+        //FIXME find out a better way to enforce the presence of message-id
+        if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
+            request = request.replaceFirst("\">", "\" message-id=\""
+                    + MESSAGE_ID_INTEGER.get() + "\"" + ">");
+        }
+        CompletableFuture<String> futureReply = request(request);
+        MESSAGE_ID_INTEGER.incrementAndGet();
+        String rp = futureReply.join();
+        log.debug("Reply from device {}", rp);
+        return rp;
+    }
+
+    @Override
+    public String get(String request) throws NetconfException {
+        return requestSync(request);
+    }
+
+    @Override
+    public String getConfig(String targetConfiguration) throws NetconfException {
         return getConfig(targetConfiguration, null);
     }
 
     @Override
-    public String getConfig(String targetConfiguration, String configurationSchema) throws IOException {
+    public String getConfig(String targetConfiguration, String configurationSchema) throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + messageID + "\"  "
+        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
                            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<get-config>\n");
         rpc.append("<source>\n");
@@ -183,23 +217,23 @@
         }
         rpc.append("</get-config>\n");
         rpc.append("</rpc>\n");
-        rpc.append(endpattern);
-        String reply = doRequest(rpc.toString());
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
         return checkReply(reply) ? reply : "ERROR " + reply;
     }
 
     @Override
-    public boolean editConfig(String newConfiguration) throws IOException {
-        newConfiguration = newConfiguration + endpattern;
-        return checkReply(doRequest(newConfiguration));
+    public boolean editConfig(String newConfiguration) throws NetconfException {
+        newConfiguration = newConfiguration + ENDPATTERN;
+        return checkReply(sendRequest(newConfiguration));
     }
 
     @Override
     public boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
-            throws IOException {
+            throws NetconfException {
         newConfiguration = newConfiguration.trim();
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + messageID + "\"  "
+        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
                            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<edit-config>");
         rpc.append("<target>");
@@ -213,13 +247,13 @@
         rpc.append("</config>");
         rpc.append("</edit-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
     public boolean copyConfig(String targetConfiguration, String newConfiguration)
-            throws IOException {
+            throws NetconfException {
         newConfiguration = newConfiguration.trim();
         if (!newConfiguration.startsWith("<configuration>")) {
             newConfiguration = "<configuration>" + newConfiguration
@@ -237,12 +271,12 @@
         rpc.append("</source>");
         rpc.append("</copy-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean deleteConfig(String targetConfiguration) throws IOException {
+    public boolean deleteConfig(String targetConfiguration) throws NetconfException {
         if (targetConfiguration.equals("running")) {
             log.warn("Target configuration for delete operation can't be \"running\"",
                      targetConfiguration);
@@ -257,12 +291,12 @@
         rpc.append("</target>");
         rpc.append("</delete-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean lock() throws IOException {
+    public boolean lock() throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
                                                       "encoding=\"UTF-8\"?>");
         rpc.append("<rpc>");
@@ -272,12 +306,12 @@
         rpc.append("</target>");
         rpc.append("</lock>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean unlock() throws IOException {
+    public boolean unlock() throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
                                                       "encoding=\"UTF-8\"?>");
         rpc.append("<rpc>");
@@ -287,16 +321,16 @@
         rpc.append("</target>");
         rpc.append("</unlock>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean close() throws IOException {
+    public boolean close() throws NetconfException {
         return close(false);
     }
 
-    private boolean close(boolean force) throws IOException {
+    private boolean close(boolean force) throws NetconfException {
         StringBuilder rpc = new StringBuilder();
         rpc.append("<rpc>");
         if (force) {
@@ -306,8 +340,8 @@
         }
         rpc.append("<close-configuration/>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString())) || close(true);
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString())) || close(true);
     }
 
     @Override
@@ -335,7 +369,17 @@
         deviceCapabilities = capabilities;
     }
 
-    private boolean checkReply(String reply) {
+    @Override
+    public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        t.addDeviceEventListener(listener);
+    }
+
+    @Override
+    public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        t.removeDeviceEventListener(listener);
+    }
+
+    private boolean checkReply(String reply) throws NetconfException {
         if (reply != null) {
             if (!reply.contains("<rpc-error>")) {
                 return true;
@@ -345,36 +389,18 @@
                 return true;
             }
         }
-        log.warn("Error in reply {}", reply);
+        log.warn("Device " + deviceInfo + "has error in reply {}", reply);
         return false;
     }
 
-    private String readOne() throws IOException {
-        //TODO try a simple string
-        final StringWriter reply = new StringWriter();
-        while (true) {
-            int charRead = bufferReader.read();
-            if (charRead == -1) {
-                throw new IOException("Session closed");
-            }
+    public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
 
-            for (int i = 0; i < endpattern.length(); i++) {
-                if (charRead == endpattern.charAt(i)) {
-                    if (i < endpattern.length() - 1) {
-                        charRead = bufferReader.read();
-                    } else {
-                        return reply.getBuffer().toString();
-                    }
-                } else {
-                    String s = endpattern.substring(0, i);
-                    for (int j = 0; i < s.length(); j++) {
-                        reply.write(s.charAt(j));
-                    }
-                    reply.write(charRead);
-                    break;
-                }
-            }
+        @Override
+        public void notify(NetconfDeviceOutputEvent event) {
+            CompletableFuture<String> completedReply = replies.get(event.getMessageID());
+            completedReply.complete(event.getMessagePayload());
         }
     }
 
+
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
new file mode 100644
index 0000000..4416e0a
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface to represent an objects that does all the IO on a NETCONF session
+ * with a device.
+ */
+public interface NetconfStreamHandler {
+    /**
+     * Sends the request on the stream that is used to communicate to and from the device.
+     *
+     * @param request request to send to the physical device
+     * @return a CompletableFuture of type String that will contain the response for the request.
+     */
+    CompletableFuture<String> sendMessage(String request);
+
+    /**
+     * Adds a listener for netconf events on the handled stream.
+     *
+     * @param listener Netconf device event listener
+     */
+    void addDeviceEventListener(NetconfDeviceOutputEventListener listener);
+
+    /**
+     * Removes a listener for netconf events on the handled stream.
+     *
+     * @param listener Netconf device event listener
+     */
+    void removeDeviceEventListener(NetconfDeviceOutputEventListener listener);
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
new file mode 100644
index 0000000..6169c3d
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Thread that gets spawned each time a session is established and handles all the input
+ * and output from the session's streams to and from the NETCONF device the session is
+ * established with.
+ */
+public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(NetconfStreamThread.class);
+    private static final String HELLO = "hello";
+    private static final String END_PATTERN = "]]>]]>";
+    private static final String RPC_REPLY = "rpc-reply";
+    private static final String RPC_ERROR = "rpc-error";
+    private static final String NOTIFICATION_LABEL = "<notification>";
+
+    private static PrintWriter outputStream;
+    private static NetconfDeviceInfo netconfDeviceInfo;
+    private static NetconfSessionDelegate sessionDelegate;
+    private static NetconfMessageState state;
+    private static List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
+            = Lists.newArrayList();
+
+    public NetconfStreamThread(final InputStream in, final OutputStream out,
+                               final InputStream err, NetconfDeviceInfo deviceInfo,
+                               NetconfSessionDelegate delegate) {
+        super(handler(in, err));
+        outputStream = new PrintWriter(out);
+        netconfDeviceInfo = deviceInfo;
+        state = NetconfMessageState.NO_MATCHING_PATTERN;
+        sessionDelegate = delegate;
+        log.debug("Stream thread for device {} session started", deviceInfo);
+        start();
+    }
+
+    @Override
+    public CompletableFuture<String> sendMessage(String request) {
+        outputStream.print(request);
+        outputStream.flush();
+        return new CompletableFuture<>();
+    }
+
+    public enum NetconfMessageState {
+
+        NO_MATCHING_PATTERN {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return FIRST_BRAKET;
+                } else {
+                    return this;
+                }
+            }
+        },
+        FIRST_BRAKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return SECOND_BRAKET;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        SECOND_BRAKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == '>') {
+                    return FIRST_BIGGER;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        FIRST_BIGGER {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return THIRD_BRAKET;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        THIRD_BRAKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return ENDING_BIGGER;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        ENDING_BIGGER {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == '>') {
+                    return END_PATTERN;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        END_PATTERN {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                return NO_MATCHING_PATTERN;
+            }
+        };
+
+        abstract NetconfMessageState evaluateChar(char c);
+    }
+
+    private static Runnable handler(final InputStream in, final InputStream err) {
+        BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
+        return () -> {
+            try {
+                boolean socketClosed = false;
+                StringBuilder deviceReplyBuilder = new StringBuilder();
+                while (!socketClosed) {
+                    int cInt = bufferReader.read();
+                    if (cInt == -1) {
+                        socketClosed = true;
+                        NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
+                                null, null, -1, netconfDeviceInfo);
+                        netconfDeviceEventListeners.forEach(
+                                listener -> listener.event(event));
+                    }
+                    char c = (char) cInt;
+                    state = state.evaluateChar(c);
+                    deviceReplyBuilder.append(c);
+                    if (state == NetconfMessageState.END_PATTERN) {
+                        String deviceReply = deviceReplyBuilder.toString()
+                                .replace(END_PATTERN, "");
+                        if (deviceReply.contains(RPC_REPLY) ||
+                                deviceReply.contains(RPC_ERROR) ||
+                                deviceReply.contains(HELLO)) {
+                            NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                    NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
+                                    null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
+                            sessionDelegate.notify(event);
+                            netconfDeviceEventListeners.forEach(
+                                    listener -> listener.event(event));
+                        } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
+                            final String finalDeviceReply = deviceReply;
+                            netconfDeviceEventListeners.forEach(
+                                    listener -> listener.event(new NetconfDeviceOutputEvent(
+                                            NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
+                                            null, finalDeviceReply, getMsgId(finalDeviceReply), netconfDeviceInfo)));
+                        } else {
+                            log.info("Error on replay from device {} ", deviceReply);
+                        }
+                        deviceReplyBuilder.setLength(0);
+                    }
+                }
+            } catch (IOException e) {
+                log.warn("Error in reading from the session for device " + netconfDeviceInfo, e);
+                throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
+                                                                        netconfDeviceInfo, e));
+                //TODO should we send a socket closed message to listeners ?
+            }
+        };
+    }
+
+    private static int getMsgId(String reply) {
+        if (!reply.contains(HELLO)) {
+            String[] outer = reply.split("message-id=");
+            Preconditions.checkArgument(outer.length != 1,
+                                        "Error in retrieving the message id");
+            String messageID = outer[1].substring(0, 3).replace("\"", "");
+            Preconditions.checkNotNull(Integer.parseInt(messageID),
+                                       "Error in retrieving the message id");
+            return Integer.parseInt(messageID);
+        } else {
+            return 0;
+        }
+    }
+
+    public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
+        if (!netconfDeviceEventListeners.contains(listener)) {
+            netconfDeviceEventListeners.add(listener);
+        }
+    }
+
+    public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
+        netconfDeviceEventListeners.remove(listener);
+    }
+}