ONOS-3605 Create thread Session input stream mechanism, adding listener for events from the device

Change-Id: Ib323487f61d9e595f7ccdc1957a92e58b7002d2a
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
index ea07728..37311b1 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
@@ -19,7 +19,6 @@
 import org.onlab.packet.IpAddress;
 import org.onosproject.net.DeviceId;
 
-import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -48,8 +47,9 @@
      *
      * @param deviceInfo info about the device to add
      * @return NetconfDevice Netconf device
+     * @throws NetconfException when device is not available
      */
-    NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws IOException;
+    NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws NetconfException;
 
     /**
      * Removes a Netconf device.
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
new file mode 100644
index 0000000..0a140f1
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Describes network configuration event.
+ */
+public final class NetconfDeviceOutputEvent extends
+        AbstractEvent<NetconfDeviceOutputEvent.Type, Object> {
+
+    private final String messagePayload;
+    private final int messageID;
+    private final NetconfDeviceInfo deviceInfo;
+
+    /**
+     * Type of network configuration events.
+     */
+    public enum Type {
+        /**
+         * Signifies that sent a reply to a request.
+         */
+        DEVICE_REPLY,
+
+        /**
+         * Signifies that the device sent a notification.
+         */
+        DEVICE_NOTIFICATION,
+
+        /**
+         * Signifies that the device is not reachable.
+         */
+        DEVICE_UNREGISTERED,
+
+        /**
+         * Signifies that the device has encountered an error.
+         */
+        DEVICE_ERROR,
+
+    }
+
+    /**
+     * Creates an event of a given type and for the specified subject and the
+     * current time.
+     *
+     * @param type              event type
+     * @param subject           event subject
+     * @param payload           message from the device
+     * @param msgID             id of the message related to the event
+     * @param netconfDeviceInfo device of event
+     */
+    public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
+                                    NetconfDeviceInfo netconfDeviceInfo) {
+        super(type, subject);
+        messagePayload = payload;
+        this.messageID = msgID;
+        deviceInfo = netconfDeviceInfo;
+    }
+
+    /**
+     * Creates an event of a given type and for the specified subject and time.
+     *
+     * @param type              event type
+     * @param subject           event subject
+     * @param payload           message from the device
+     * @param msgID             id of the message related to the event
+     * @param netconfDeviceInfo device of event
+     * @param msgID             id of the message related to the event
+     * @param time              occurrence time
+     */
+    public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
+                                    NetconfDeviceInfo netconfDeviceInfo, long time) {
+        super(type, subject, time);
+        messagePayload = payload;
+        deviceInfo = netconfDeviceInfo;
+        this.messageID = msgID;
+    }
+
+    public String getMessagePayload() {
+        return messagePayload;
+    }
+
+    public NetconfDeviceInfo getDeviceInfo() {
+        return deviceInfo;
+    }
+
+    public Integer getMessageID() {
+        return messageID;
+    }
+}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEventListener.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEventListener.java
new file mode 100644
index 0000000..e33cd9d
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEventListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Interface for Netconf device output Listeners.
+ */
+public interface NetconfDeviceOutputEventListener extends EventListener<NetconfDeviceOutputEvent> {
+}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfException.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfException.java
new file mode 100644
index 0000000..4b46a4b
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfException.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf;
+
+import java.io.IOException;
+
+/**
+ * Represents class of errors related to NETCONF SB protocol.
+ */
+public class NetconfException extends IOException {
+    /**
+     * Constructs an exception with the specified message.
+     *
+     * @param message the message describing the specific nature of the error
+     */
+    public NetconfException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs an exception with the specified message and the underlying cause.
+     *
+     * @param message the message describing the specific nature of the error
+     * @param cause   the underlying cause of this error
+     */
+    public NetconfException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index 2f02125..bb5e996 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -16,8 +16,8 @@
 
 package org.onosproject.netconf;
 
-import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * NETCONF session object that allows NETCONF operations on top with the physical
@@ -27,28 +27,45 @@
 public interface NetconfSession {
 
     /**
+     * Executes an asynchronous RPC to the server and obtains a future to be completed.
+     *
+     * @param request the XML containing the RPC for the server.
+     * @return Server response or ERROR
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
+     */
+    CompletableFuture<String> request(String request) throws NetconfException;
+
+
+    /**
      * Retrives the requested configuration, different from get-config.
      *
      * @param request the XML containing the request to the server.
      * @return device running configuration
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
-    String get(String request) throws IOException;
+    String get(String request) throws NetconfException;
 
     /**
-     * Executes an RPC to the server.
+     * Executes an synchronous RPC to the server.
      *
      * @param request the XML containing the RPC for the server.
      * @return Server response or ERROR
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
-    String doRPC(String request) throws IOException;
+    String requestSync(String request) throws NetconfException;
 
     /**
      * Retrives the specified configuration.
      *
      * @param targetConfiguration the type of configuration to retrieve.
      * @return specified configuration.
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
-    String getConfig(String targetConfiguration) throws IOException;
+    String getConfig(String targetConfiguration) throws NetconfException;
 
     /**
      * Retrives part of the specivied configuration based on the filterSchema.
@@ -57,28 +74,35 @@
      * @param configurationFilterSchema XML schema to filter the configuration
      *                                  elements we are interested in
      * @return device running configuration.
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
     String getConfig(String targetConfiguration, String configurationFilterSchema)
-            throws IOException;
+            throws NetconfException;
 
     /**
      * Retrives part of the specified configuration based on the filterSchema.
      *
      * @param newConfiguration configuration to set
      * @return true if the configuration was edited correctly
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
 
-    boolean editConfig(String newConfiguration) throws IOException;
+    boolean editConfig(String newConfiguration) throws NetconfException;
 
     /**
      * Retrives part of the specified configuration based on the filterSchema.
+     *
      * @param targetConfiguration the targetConfiguration to change
-     * @param mode selected mode to change the configuration
-     * @param newConfiguration configuration to set
+     * @param mode                selected mode to change the configuration
+     * @param newConfiguration    configuration to set
      * @return true if the configuration was edited correctly
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
     boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
-            throws IOException;
+            throws NetconfException;
 
     /**
      * Copies the new configuration, an Url or a complete configuration xml tree
@@ -88,39 +112,49 @@
      * @param targetConfiguration the type of configuration to retrieve.
      * @param newConfiguration    configuration to set
      * @return true if the configuration was copied correctly
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
     boolean copyConfig(String targetConfiguration, String newConfiguration)
-            throws IOException;
+            throws NetconfException;
 
     /**
      * Deletes part of the specified configuration based on the filterSchema.
      *
      * @param targetConfiguration the name of the configuration to delete
      * @return true if the configuration was copied correctly
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
-    boolean deleteConfig(String targetConfiguration) throws IOException;
+    boolean deleteConfig(String targetConfiguration) throws NetconfException;
 
     /**
      * Locks the candidate configuration.
      *
      * @return true if successful.
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
-    boolean lock() throws IOException;
+    boolean lock() throws NetconfException;
 
     /**
      * Unlocks the candidate configuration.
      *
      * @return true if successful.
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
-    boolean unlock() throws IOException;
+    boolean unlock() throws NetconfException;
 
     /**
      * Closes the Netconf session with the device.
      * the first time it tries gracefully, then kills it forcefully
      *
      * @return true if closed
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
      */
-    boolean close() throws IOException;
+    boolean close() throws NetconfException;
 
     /**
      * Gets the session ID of the Netconf session.
@@ -137,10 +171,24 @@
     String getServerCapabilities();
 
     /**
-     * Sets the device capabilities.
+     * Sets the ONOS side capabilities.
      *
      * @param capabilities list of capabilities the device has.
      */
     void setDeviceCapabilities(List<String> capabilities);
 
+    /**
+     * Remove a listener from the underlying stream handler implementation.
+     *
+     * @param listener event listener.
+     */
+    void addDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+
+    /**
+     * Remove a listener from the underlying stream handler implementation.
+     *
+     * @param listener event listener.
+     */
+    void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
index 7477238..cdcfbf8 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
@@ -26,11 +26,11 @@
 import org.onosproject.netconf.NetconfDevice;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceListener;
+import org.onosproject.netconf.NetconfException;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -90,9 +90,9 @@
     }
 
     @Override
-    public NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws IOException {
+    public NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
         if (netconfDeviceMap.containsKey(deviceInfo.getDeviceId())) {
-            log.warn("Device {} is already present", deviceInfo);
+            log.info("Device {} is already present", deviceInfo);
             return netconfDeviceMap.get(deviceInfo.getDeviceId());
         } else {
             log.info("Creating NETCONF device {}", deviceInfo);
@@ -109,9 +109,8 @@
         }
     }
 
-    private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws IOException {
-        NetconfDevice netconfDevice = null;
-        netconfDevice = new NetconfDeviceImpl(deviceInfo);
+    private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
+        NetconfDevice netconfDevice = new NetconfDeviceImpl(deviceInfo);
         for (NetconfDeviceListener l : netconfDeviceListeners) {
             l.deviceAdded(deviceInfo);
         }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java
index 762c21c..5a8f499 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceImpl.java
@@ -18,6 +18,7 @@
 
 import org.onosproject.netconf.NetconfDevice;
 import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,12 +37,13 @@
     private boolean deviceState = false;
     private NetconfSession netconfSession;
 
-    public NetconfDeviceImpl(NetconfDeviceInfo deviceInfo) throws IOException {
+    public NetconfDeviceImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
         netconfDeviceInfo = deviceInfo;
         try {
             netconfSession = new NetconfSessionImpl(netconfDeviceInfo);
         } catch (IOException e) {
-            throw new IOException("Cannot create connection and session", e);
+            throw new NetconfException("Cannot create connection and session for device " +
+                                               deviceInfo, e);
         }
         deviceState = true;
     }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
new file mode 100644
index 0000000..ce0c235
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Example of a listener for events that happen a Netconf session established
+ * for a particular NETCONF device.
+ */
+public class NetconfDeviceOutputEventListenerImpl implements NetconfDeviceOutputEventListener {
+
+    private static final Logger log =
+            LoggerFactory.getLogger(NetconfDeviceOutputEventListenerImpl.class);
+
+    private NetconfDeviceInfo deviceInfo;
+
+    public NetconfDeviceOutputEventListenerImpl(NetconfDeviceInfo deviceInfo) {
+        this.deviceInfo = deviceInfo;
+    }
+
+    @Override
+    public void event(NetconfDeviceOutputEvent event) {
+        switch (event.type()) {
+            case DEVICE_REPLY:
+                log.debug("Device {} has reply: {}", deviceInfo, event.getMessagePayload());
+                break;
+            case DEVICE_NOTIFICATION:
+                log.info("Device {} has notification: {}", deviceInfo, event.getMessagePayload());
+                break;
+            case DEVICE_UNREGISTERED:
+                log.warn("Device {} has closed session", deviceInfo);
+                //TODO tell onos about closed session
+                break;
+            case DEVICE_ERROR:
+                log.warn("Device {} has error: {}", deviceInfo, event.getMessagePayload());
+                break;
+            default:
+                log.warn("Wrong event type {} ", event.type());
+        }
+
+    }
+
+    @Override
+    public boolean isRelevant(NetconfDeviceOutputEvent event) {
+        return deviceInfo.equals(event.getDeviceInfo());
+    }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java
new file mode 100644
index 0000000..1b526db
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+
+/**
+ * Entity associated with a NetconfSessionImpl and capable of receiving notifications of
+ * events about the session.
+ */
+public interface NetconfSessionDelegate {
+
+        /**
+         * Notifies the delegate via the specified event.
+         *
+         * @param event store generated event
+         */
+        void notify(NetconfDeviceOutputEvent event);
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 795211e..7f544dc 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -18,56 +18,70 @@
 
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.StreamGobbler;
 import com.google.common.base.Preconditions;
 import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
 
 /**
  * Implementation of a NETCONF session to talk to a device.
  */
 public class NetconfSessionImpl implements NetconfSession {
 
-    public static final Logger log = LoggerFactory
+    private static final Logger log = LoggerFactory
             .getLogger(NetconfSessionImpl.class);
+
+
     private static final int CONNECTION_TIMEOUT = 0;
+    private static final String ENDPATTERN = "]]>]]>";
+    private static final AtomicInteger MESSAGE_ID_INTEGER = new AtomicInteger(0);
+    private static final String MESSAGE_ID_STRING = "message-id";
+    private static final String HELLO = "hello";
+    private static final String NEW_LINE = "\n";
 
 
     private Connection netconfConnection;
     private NetconfDeviceInfo deviceInfo;
     private Session sshSession;
     private boolean connectionActive;
-    private BufferedReader bufferReader = null;
     private PrintWriter out = null;
-    private int messageID = 0;
-    //TODO inject these capabilites from yang model provided by app
     private List<String> deviceCapabilities =
             Collections.singletonList("urn:ietf:params:netconf:base:1.0");
     private String serverCapabilities;
-    private String endpattern = "]]>]]>";
+    private NetconfStreamHandler t;
+    private Map<Integer, CompletableFuture<String>> replies;
 
 
-    public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws IOException {
+    public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
         this.deviceInfo = deviceInfo;
         connectionActive = false;
+        replies = new HashMap<>();
         startConnection();
     }
 
 
-    private void startConnection() throws IOException {
+    private void startConnection() throws NetconfException {
         if (!connectionActive) {
             netconfConnection = new Connection(deviceInfo.ip().toString(), deviceInfo.port());
-            netconfConnection.connect(null, CONNECTION_TIMEOUT, 0);
+            try {
+                netconfConnection.connect(null, CONNECTION_TIMEOUT, 5000);
+            } catch (IOException e) {
+                throw new NetconfException("Cannot open a connection with device" + deviceInfo, e);
+            }
             boolean isAuthenticated;
             try {
                 if (deviceInfo.getKeyFile() != null) {
@@ -75,39 +89,49 @@
                             deviceInfo.name(), deviceInfo.getKeyFile(),
                             deviceInfo.password());
                 } else {
-                    log.info("authenticate with username {} and password {}",
-                             deviceInfo.name(), deviceInfo.password());
+                    log.debug("Authenticating to device {} with username {}",
+                              deviceInfo.getDeviceId(), deviceInfo.name(), deviceInfo.password());
                     isAuthenticated = netconfConnection.authenticateWithPassword(
                             deviceInfo.name(), deviceInfo.password());
                 }
             } catch (IOException e) {
-                throw new IOException("Authentication connection failed:" +
-                                              e.getMessage());
+                log.error("Authentication connection to device " +
+                                  deviceInfo.getDeviceId() + " failed:" +
+                                  e.getMessage());
+                throw new NetconfException("Authentication connection to device " +
+                                                   deviceInfo.getDeviceId() + " failed", e);
             }
 
             connectionActive = true;
             Preconditions.checkArgument(isAuthenticated,
-                                        "Authentication password and username failed");
+                                        "Authentication to device {} with username " +
+                                                "{} Failed",
+                                        deviceInfo.getDeviceId(), deviceInfo.name(),
+                                        deviceInfo.password());
             startSshSession();
         }
     }
 
-    private void startSshSession() throws IOException {
+    private void startSshSession() throws NetconfException {
         try {
             sshSession = netconfConnection.openSession();
             sshSession.startSubSystem("netconf");
-            bufferReader = new BufferedReader(new InputStreamReader(new StreamGobbler(
-                    sshSession.getStdout())));
             out = new PrintWriter(sshSession.getStdin());
+            t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+                                        sshSession.getStderr(), deviceInfo,
+                                        new NetconfSessionDelegateImpl());
+            this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
             sendHello();
         } catch (IOException e) {
-            throw new IOException("Failed to create ch.ethz.ssh2.Session session:" +
-                                          e.getMessage());
+            log.error("Failed to create ch.ethz.ssh2.Session session:" +
+                              e.getMessage());
+            throw new NetconfException("Failed to create ch.ethz.ssh2.Session session with device" +
+                                               deviceInfo, e);
         }
     }
 
     private void sendHello() throws IOException {
-        serverCapabilities = doRequest(createHelloString());
+        serverCapabilities = sendRequest(createHelloString());
     }
 
     private String createHelloString() {
@@ -119,58 +143,68 @@
                 cap -> hellobuffer.append("    <capability>" + cap + "</capability>\n"));
         hellobuffer.append("  </capabilities>\n");
         hellobuffer.append("</hello>\n");
-        hellobuffer.append(endpattern);
+        hellobuffer.append(ENDPATTERN);
         return hellobuffer.toString();
 
     }
 
-    @Override
-    public String doRPC(String request) throws IOException {
-        String reply = doRequest(request + "\n" + endpattern);
-        return checkReply(reply) ? reply : "ERROR " + reply;
-    }
-
-    private String doRequest(String request) throws IOException {
-        //log.info("sshState " + sshSession.getState() + "request" + request);
-        checkAndRestablishSession();
-        //log.info("sshState after" + sshSession.getState());
-        out.print(request);
-        out.flush();
-        messageID++;
-        return readOne();
-    }
-
-    private void checkAndRestablishSession() throws IOException {
+    private void checkAndRestablishSession() throws NetconfException {
         if (sshSession.getState() != 2) {
             try {
                 startSshSession();
             } catch (IOException e) {
-                log.info("the connection had to be reopened");
+                log.debug("The connection with {} had to be reopened", deviceInfo.getDeviceId());
                 try {
                     startConnection();
                 } catch (IOException e2) {
                     log.error("No connection {} for device, exception {}", netconfConnection, e2);
-                    throw new IOException(e.getMessage());
-                    //TODO remove device from ONOS
+                    throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
                 }
             }
         }
     }
 
     @Override
-    public String get(String request) throws IOException {
-        return doRPC(request);
+    public String requestSync(String request) throws NetconfException {
+        String reply = sendRequest(request + NEW_LINE + ENDPATTERN);
+        return checkReply(reply) ? reply : "ERROR " + reply;
     }
 
     @Override
-    public String getConfig(String targetConfiguration) throws IOException {
+    public CompletableFuture<String> request(String request) {
+        CompletableFuture<String> ftrep = t.sendMessage(request);
+        replies.put(MESSAGE_ID_INTEGER.get(), ftrep);
+        return ftrep;
+    }
+
+    private String sendRequest(String request) throws NetconfException {
+        checkAndRestablishSession();
+        //FIXME find out a better way to enforce the presence of message-id
+        if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
+            request = request.replaceFirst("\">", "\" message-id=\""
+                    + MESSAGE_ID_INTEGER.get() + "\"" + ">");
+        }
+        CompletableFuture<String> futureReply = request(request);
+        MESSAGE_ID_INTEGER.incrementAndGet();
+        String rp = futureReply.join();
+        log.debug("Reply from device {}", rp);
+        return rp;
+    }
+
+    @Override
+    public String get(String request) throws NetconfException {
+        return requestSync(request);
+    }
+
+    @Override
+    public String getConfig(String targetConfiguration) throws NetconfException {
         return getConfig(targetConfiguration, null);
     }
 
     @Override
-    public String getConfig(String targetConfiguration, String configurationSchema) throws IOException {
+    public String getConfig(String targetConfiguration, String configurationSchema) throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + messageID + "\"  "
+        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
                            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<get-config>\n");
         rpc.append("<source>\n");
@@ -183,23 +217,23 @@
         }
         rpc.append("</get-config>\n");
         rpc.append("</rpc>\n");
-        rpc.append(endpattern);
-        String reply = doRequest(rpc.toString());
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
         return checkReply(reply) ? reply : "ERROR " + reply;
     }
 
     @Override
-    public boolean editConfig(String newConfiguration) throws IOException {
-        newConfiguration = newConfiguration + endpattern;
-        return checkReply(doRequest(newConfiguration));
+    public boolean editConfig(String newConfiguration) throws NetconfException {
+        newConfiguration = newConfiguration + ENDPATTERN;
+        return checkReply(sendRequest(newConfiguration));
     }
 
     @Override
     public boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
-            throws IOException {
+            throws NetconfException {
         newConfiguration = newConfiguration.trim();
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + messageID + "\"  "
+        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
                            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<edit-config>");
         rpc.append("<target>");
@@ -213,13 +247,13 @@
         rpc.append("</config>");
         rpc.append("</edit-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
     public boolean copyConfig(String targetConfiguration, String newConfiguration)
-            throws IOException {
+            throws NetconfException {
         newConfiguration = newConfiguration.trim();
         if (!newConfiguration.startsWith("<configuration>")) {
             newConfiguration = "<configuration>" + newConfiguration
@@ -237,12 +271,12 @@
         rpc.append("</source>");
         rpc.append("</copy-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean deleteConfig(String targetConfiguration) throws IOException {
+    public boolean deleteConfig(String targetConfiguration) throws NetconfException {
         if (targetConfiguration.equals("running")) {
             log.warn("Target configuration for delete operation can't be \"running\"",
                      targetConfiguration);
@@ -257,12 +291,12 @@
         rpc.append("</target>");
         rpc.append("</delete-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean lock() throws IOException {
+    public boolean lock() throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
                                                       "encoding=\"UTF-8\"?>");
         rpc.append("<rpc>");
@@ -272,12 +306,12 @@
         rpc.append("</target>");
         rpc.append("</lock>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean unlock() throws IOException {
+    public boolean unlock() throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
                                                       "encoding=\"UTF-8\"?>");
         rpc.append("<rpc>");
@@ -287,16 +321,16 @@
         rpc.append("</target>");
         rpc.append("</unlock>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean close() throws IOException {
+    public boolean close() throws NetconfException {
         return close(false);
     }
 
-    private boolean close(boolean force) throws IOException {
+    private boolean close(boolean force) throws NetconfException {
         StringBuilder rpc = new StringBuilder();
         rpc.append("<rpc>");
         if (force) {
@@ -306,8 +340,8 @@
         }
         rpc.append("<close-configuration/>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString())) || close(true);
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString())) || close(true);
     }
 
     @Override
@@ -335,7 +369,17 @@
         deviceCapabilities = capabilities;
     }
 
-    private boolean checkReply(String reply) {
+    @Override
+    public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        t.addDeviceEventListener(listener);
+    }
+
+    @Override
+    public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        t.removeDeviceEventListener(listener);
+    }
+
+    private boolean checkReply(String reply) throws NetconfException {
         if (reply != null) {
             if (!reply.contains("<rpc-error>")) {
                 return true;
@@ -345,36 +389,18 @@
                 return true;
             }
         }
-        log.warn("Error in reply {}", reply);
+        log.warn("Device " + deviceInfo + "has error in reply {}", reply);
         return false;
     }
 
-    private String readOne() throws IOException {
-        //TODO try a simple string
-        final StringWriter reply = new StringWriter();
-        while (true) {
-            int charRead = bufferReader.read();
-            if (charRead == -1) {
-                throw new IOException("Session closed");
-            }
+    public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
 
-            for (int i = 0; i < endpattern.length(); i++) {
-                if (charRead == endpattern.charAt(i)) {
-                    if (i < endpattern.length() - 1) {
-                        charRead = bufferReader.read();
-                    } else {
-                        return reply.getBuffer().toString();
-                    }
-                } else {
-                    String s = endpattern.substring(0, i);
-                    for (int j = 0; i < s.length(); j++) {
-                        reply.write(s.charAt(j));
-                    }
-                    reply.write(charRead);
-                    break;
-                }
-            }
+        @Override
+        public void notify(NetconfDeviceOutputEvent event) {
+            CompletableFuture<String> completedReply = replies.get(event.getMessageID());
+            completedReply.complete(event.getMessagePayload());
         }
     }
 
+
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
new file mode 100644
index 0000000..4416e0a
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface to represent an objects that does all the IO on a NETCONF session
+ * with a device.
+ */
+public interface NetconfStreamHandler {
+    /**
+     * Sends the request on the stream that is used to communicate to and from the device.
+     *
+     * @param request request to send to the physical device
+     * @return a CompletableFuture of type String that will contain the response for the request.
+     */
+    CompletableFuture<String> sendMessage(String request);
+
+    /**
+     * Adds a listener for netconf events on the handled stream.
+     *
+     * @param listener Netconf device event listener
+     */
+    void addDeviceEventListener(NetconfDeviceOutputEventListener listener);
+
+    /**
+     * Removes a listener for netconf events on the handled stream.
+     *
+     * @param listener Netconf device event listener
+     */
+    void removeDeviceEventListener(NetconfDeviceOutputEventListener listener);
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
new file mode 100644
index 0000000..6169c3d
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Thread that gets spawned each time a session is established and handles all the input
+ * and output from the session's streams to and from the NETCONF device the session is
+ * established with.
+ */
+public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(NetconfStreamThread.class);
+    private static final String HELLO = "hello";
+    private static final String END_PATTERN = "]]>]]>";
+    private static final String RPC_REPLY = "rpc-reply";
+    private static final String RPC_ERROR = "rpc-error";
+    private static final String NOTIFICATION_LABEL = "<notification>";
+
+    private static PrintWriter outputStream;
+    private static NetconfDeviceInfo netconfDeviceInfo;
+    private static NetconfSessionDelegate sessionDelegate;
+    private static NetconfMessageState state;
+    private static List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
+            = Lists.newArrayList();
+
+    public NetconfStreamThread(final InputStream in, final OutputStream out,
+                               final InputStream err, NetconfDeviceInfo deviceInfo,
+                               NetconfSessionDelegate delegate) {
+        super(handler(in, err));
+        outputStream = new PrintWriter(out);
+        netconfDeviceInfo = deviceInfo;
+        state = NetconfMessageState.NO_MATCHING_PATTERN;
+        sessionDelegate = delegate;
+        log.debug("Stream thread for device {} session started", deviceInfo);
+        start();
+    }
+
+    @Override
+    public CompletableFuture<String> sendMessage(String request) {
+        outputStream.print(request);
+        outputStream.flush();
+        return new CompletableFuture<>();
+    }
+
+    public enum NetconfMessageState {
+
+        NO_MATCHING_PATTERN {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return FIRST_BRAKET;
+                } else {
+                    return this;
+                }
+            }
+        },
+        FIRST_BRAKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return SECOND_BRAKET;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        SECOND_BRAKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == '>') {
+                    return FIRST_BIGGER;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        FIRST_BIGGER {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return THIRD_BRAKET;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        THIRD_BRAKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return ENDING_BIGGER;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        ENDING_BIGGER {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == '>') {
+                    return END_PATTERN;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        END_PATTERN {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                return NO_MATCHING_PATTERN;
+            }
+        };
+
+        abstract NetconfMessageState evaluateChar(char c);
+    }
+
+    private static Runnable handler(final InputStream in, final InputStream err) {
+        BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
+        return () -> {
+            try {
+                boolean socketClosed = false;
+                StringBuilder deviceReplyBuilder = new StringBuilder();
+                while (!socketClosed) {
+                    int cInt = bufferReader.read();
+                    if (cInt == -1) {
+                        socketClosed = true;
+                        NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
+                                null, null, -1, netconfDeviceInfo);
+                        netconfDeviceEventListeners.forEach(
+                                listener -> listener.event(event));
+                    }
+                    char c = (char) cInt;
+                    state = state.evaluateChar(c);
+                    deviceReplyBuilder.append(c);
+                    if (state == NetconfMessageState.END_PATTERN) {
+                        String deviceReply = deviceReplyBuilder.toString()
+                                .replace(END_PATTERN, "");
+                        if (deviceReply.contains(RPC_REPLY) ||
+                                deviceReply.contains(RPC_ERROR) ||
+                                deviceReply.contains(HELLO)) {
+                            NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                    NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
+                                    null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
+                            sessionDelegate.notify(event);
+                            netconfDeviceEventListeners.forEach(
+                                    listener -> listener.event(event));
+                        } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
+                            final String finalDeviceReply = deviceReply;
+                            netconfDeviceEventListeners.forEach(
+                                    listener -> listener.event(new NetconfDeviceOutputEvent(
+                                            NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
+                                            null, finalDeviceReply, getMsgId(finalDeviceReply), netconfDeviceInfo)));
+                        } else {
+                            log.info("Error on replay from device {} ", deviceReply);
+                        }
+                        deviceReplyBuilder.setLength(0);
+                    }
+                }
+            } catch (IOException e) {
+                log.warn("Error in reading from the session for device " + netconfDeviceInfo, e);
+                throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
+                                                                        netconfDeviceInfo, e));
+                //TODO should we send a socket closed message to listeners ?
+            }
+        };
+    }
+
+    private static int getMsgId(String reply) {
+        if (!reply.contains(HELLO)) {
+            String[] outer = reply.split("message-id=");
+            Preconditions.checkArgument(outer.length != 1,
+                                        "Error in retrieving the message id");
+            String messageID = outer[1].substring(0, 3).replace("\"", "");
+            Preconditions.checkNotNull(Integer.parseInt(messageID),
+                                       "Error in retrieving the message id");
+            return Integer.parseInt(messageID);
+        } else {
+            return 0;
+        }
+    }
+
+    public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
+        if (!netconfDeviceEventListeners.contains(listener)) {
+            netconfDeviceEventListeners.add(listener);
+        }
+    }
+
+    public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
+        netconfDeviceEventListeners.remove(listener);
+    }
+}
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 0aea67e..ea3bfcc 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -65,7 +65,7 @@
     protected DeviceProviderRegistry providerRegistry;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected NetconfController controller; //where is initiated ?
+    protected NetconfController controller;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry cfgService;
@@ -73,11 +73,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
+    private static final String APP_NAME = "org.onosproject.netconf";
+    private static final String SCHEME_NAME = "netconf";
+    private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.netconf.provider.device";
+    private static final String UNKNOWN = "unknown";
 
     private DeviceProviderService providerService;
     private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();
-    protected static final String ISNOTNULL = "NetconfDeviceInfo is not null";
-    private static final String UNKNOWN = "unknown";
 
     private final ConfigFactory factory =
             new ConfigFactory<ApplicationId, NetconfProviderConfig>(APP_SUBJECT_FACTORY,
@@ -96,10 +98,10 @@
     @Activate
     public void activate() {
         providerService = providerRegistry.register(this);
+        appId = coreService.registerApplication(APP_NAME);
         cfgService.registerConfigFactory(factory);
         cfgService.addListener(cfgLister);
         controller.addDeviceListener(innerNodeListener);
-        appId = coreService.registerApplication("org.onosproject.netconf");
         connectDevices();
         log.info("Started");
     }
@@ -110,11 +112,12 @@
         providerRegistry.unregister(this);
         providerService = null;
         cfgService.unregisterConfigFactory(factory);
+        controller.removeDeviceListener(innerNodeListener);
         log.info("Stopped");
     }
 
     public NetconfDeviceProvider() {
-        super(new ProviderId("netconf", "org.onosproject.netconf.provider.device"));
+        super(new ProviderId(SCHEME_NAME, DEVICE_PROVIDER_PACKAGE));
     }
 
     @Override
@@ -142,15 +145,18 @@
 
     private class InnerNetconfDeviceListener implements NetconfDeviceListener {
 
+        private static final String IPADDRESS = "ipaddress";
+        protected static final String ISNULL = "NetconfDeviceInfo is null";
+
         @Override
         public void deviceAdded(NetconfDeviceInfo nodeId) {
-            Preconditions.checkNotNull(nodeId, ISNOTNULL);
+            Preconditions.checkNotNull(nodeId, ISNULL);
             DeviceId deviceId = nodeId.getDeviceId();
             //Netconf configuration object
             ChassisId cid = new ChassisId();
             String ipAddress = nodeId.ip().toString();
             SparseAnnotations annotations = DefaultAnnotations.builder()
-                    .set("ipaddress", ipAddress).build();
+                    .set(IPADDRESS, ipAddress).build();
             DeviceDescription deviceDescription = new DefaultDeviceDescription(
                     deviceId.uri(),
                     Device.Type.SWITCH,
@@ -164,7 +170,7 @@
 
         @Override
         public void deviceRemoved(NetconfDeviceInfo nodeId) {
-            Preconditions.checkNotNull(nodeId, ISNOTNULL);
+            Preconditions.checkNotNull(nodeId, ISNULL);
             DeviceId deviceId = nodeId.getDeviceId();
             providerService.deviceDisconnected(deviceId);
 
@@ -184,7 +190,7 @@
                                                                        addr.ip(),
                                                                        addr.port()));
                                      } catch (IOException e) {
-                                         log.warn("Can't connect to NETCONF " +
+                                         log.info("Can't connect to NETCONF " +
                                                           "device on {}:{}",
                                                   addr.ip(),
                                                   addr.port());