Move internal classes under .impl package (2/2)

- adding back classes in old location marked as Deprecated

Change-Id: I27a6adf9faac5b6c7882d233494dfc994f7e978f
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/DefaultNetconfDevice.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/DefaultNetconfDevice.java
new file mode 100644
index 0000000..0740cc4
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/DefaultNetconfDevice.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDevice;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfSession;
+import org.onosproject.netconf.NetconfSessionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Default implementation of a NETCONF device.
+ *
+ * @deprecated in 1.10.0
+ */
+@Deprecated
+public class DefaultNetconfDevice implements NetconfDevice {
+
+    public static final Logger log = LoggerFactory
+            .getLogger(DefaultNetconfDevice.class);
+
+    private NetconfDeviceInfo netconfDeviceInfo;
+    private boolean deviceState = true;
+    protected NetconfSessionFactory sessionFactory = new SshNetconfSessionFactory();
+    private NetconfSession netconfSession;
+
+    /**
+     * Creates a new default NETCONF device with the information provided.
+     * The device gets created only if no exception is thrown while connecting to
+     * it and establishing the NETCONF session.
+     * @param deviceInfo information about the device to be created.
+     * @throws NetconfException if there are problems in creating or establishing
+     * the underlying NETCONF connection and session.
+     */
+    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo)
+            throws NetconfException {
+        netconfDeviceInfo = deviceInfo;
+        try {
+            netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+        } catch (IOException e) {
+            deviceState = false;
+            throw new NetconfException("Cannot create connection and session for device " +
+                                               deviceInfo, e);
+        }
+    }
+
+    @Override
+    public boolean isActive() {
+        return deviceState;
+    }
+
+    @Override
+    public NetconfSession getSession() {
+        return netconfSession;
+    }
+
+    @Override
+    public void disconnect() {
+        deviceState = false;
+        try {
+            netconfSession.close();
+        } catch (IOException e) {
+            log.warn("Cannot communicate with the device {} session already closed", netconfDeviceInfo);
+        }
+    }
+
+    @Override
+    public NetconfDeviceInfo getDeviceInfo() {
+        return netconfDeviceInfo;
+    }
+
+    public class SshNetconfSessionFactory implements NetconfSessionFactory {
+
+        @Override
+        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+            return new NetconfSessionImpl(netconfDeviceInfo);
+        }
+    }
+
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
new file mode 100644
index 0000000..0747f8f
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.key.DeviceKey;
+import org.onosproject.net.key.DeviceKeyId;
+import org.onosproject.net.key.DeviceKeyService;
+import org.onosproject.net.key.UsernamePassword;
+import org.onosproject.netconf.NetconfController;
+import org.onosproject.netconf.NetconfDevice;
+import org.onosproject.netconf.NetconfDeviceFactory;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceListener;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * The implementation of NetconfController.
+ *
+ * @deprecated in 1.10.0
+ */
+@Deprecated
+@Component(immediate = false, enabled = false)
+@Service
+public class NetconfControllerImpl implements NetconfController {
+    private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 5;
+    private static final String PROP_NETCONF_CONNECT_TIMEOUT = "netconfConnectTimeout";
+    @Property(name = PROP_NETCONF_CONNECT_TIMEOUT, intValue = DEFAULT_CONNECT_TIMEOUT_SECONDS,
+            label = "Time (in seconds) to wait for a NETCONF connect.")
+    protected static int netconfConnectTimeout = DEFAULT_CONNECT_TIMEOUT_SECONDS;
+
+    private static final String PROP_NETCONF_REPLY_TIMEOUT = "netconfReplyTimeout";
+    private static final int DEFAULT_REPLY_TIMEOUT_SECONDS = 5;
+    @Property(name = PROP_NETCONF_REPLY_TIMEOUT, intValue = DEFAULT_REPLY_TIMEOUT_SECONDS,
+            label = "Time (in seconds) waiting for a NetConf reply")
+    protected static int netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceKeyService deviceKeyService;
+
+    public static final Logger log = LoggerFactory
+            .getLogger(NetconfControllerImpl.class);
+
+    private Map<DeviceId, NetconfDevice> netconfDeviceMap = new ConcurrentHashMap<>();
+
+    private final NetconfDeviceOutputEventListener downListener = new DeviceDownEventListener();
+
+    protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
+    protected NetconfDeviceFactory deviceFactory = new DefaultNetconfDeviceFactory();
+
+    protected final ExecutorService executor =
+            Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
+                                                           "connection-reopen-%d", log));
+
+    @Activate
+    public void activate(ComponentContext context) {
+        cfgService.registerProperties(getClass());
+        modified(context);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
+        netconfDeviceMap.clear();
+        log.info("Stopped");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context == null) {
+            netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS;
+            netconfConnectTimeout = DEFAULT_CONNECT_TIMEOUT_SECONDS;
+            log.info("No component configuration");
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+
+        int newNetconfReplyTimeout;
+        int newNetconfConnectTimeout;
+        try {
+            String s = get(properties, PROP_NETCONF_REPLY_TIMEOUT);
+            newNetconfReplyTimeout = isNullOrEmpty(s) ?
+                    netconfReplyTimeout : Integer.parseInt(s.trim());
+
+            s = get(properties, PROP_NETCONF_CONNECT_TIMEOUT);
+            newNetconfConnectTimeout = isNullOrEmpty(s) ?
+                    netconfConnectTimeout : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException e) {
+            log.warn("Component configuration had invalid value", e);
+            return;
+        }
+
+        if (newNetconfConnectTimeout < 0) {
+            log.warn("netconfConnectTimeout is invalid - less than 0");
+            return;
+        } else if (newNetconfReplyTimeout <= 0) {
+            log.warn("netconfReplyTimeout is invalid - 0 or less.");
+            return;
+        }
+
+        netconfReplyTimeout = newNetconfReplyTimeout;
+        netconfConnectTimeout = newNetconfConnectTimeout;
+        log.info("Settings: {} = {}, {} = {}",
+                 PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout);
+    }
+
+    @Override
+    public void addDeviceListener(NetconfDeviceListener listener) {
+        if (!netconfDeviceListeners.contains(listener)) {
+            netconfDeviceListeners.add(listener);
+        }
+    }
+
+    @Override
+    public void removeDeviceListener(NetconfDeviceListener listener) {
+        netconfDeviceListeners.remove(listener);
+    }
+
+    @Override
+    public NetconfDevice getNetconfDevice(DeviceId deviceInfo) {
+        return netconfDeviceMap.get(deviceInfo);
+    }
+
+    @Override
+    public NetconfDevice getNetconfDevice(IpAddress ip, int port) {
+        for (DeviceId info : netconfDeviceMap.keySet()) {
+            if (info.uri().getSchemeSpecificPart().equals(ip.toString() + ":" + port)) {
+                return netconfDeviceMap.get(info);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public NetconfDevice connectDevice(DeviceId deviceId) throws NetconfException {
+        if (netconfDeviceMap.containsKey(deviceId)) {
+            log.debug("Device {} is already present", deviceId);
+            return netconfDeviceMap.get(deviceId);
+        } else {
+            log.debug("Creating NETCONF device {}", deviceId);
+            Device device = deviceService.getDevice(deviceId);
+            String ip;
+            int port;
+            if (device != null) {
+                ip = device.annotations().value("ipaddress");
+                port = Integer.parseInt(device.annotations().value("port"));
+            } else {
+                String[] info = deviceId.toString().split(":");
+                if (info.length == 3) {
+                    ip = info[1];
+                    port = Integer.parseInt(info[2]);
+                } else {
+                    ip = Arrays.asList(info).stream().filter(el -> !el.equals(info[0])
+                            && !el.equals(info[info.length - 1]))
+                            .reduce((t, u) -> t + ":" + u)
+                            .get();
+                    log.debug("ip v6 {}", ip);
+                    port = Integer.parseInt(info[info.length - 1]);
+                }
+            }
+            try {
+                DeviceKey deviceKey = deviceKeyService.getDeviceKey(
+                        DeviceKeyId.deviceKeyId(deviceId.toString()));
+                NetconfDeviceInfo deviceInfo = null;
+                if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
+                    UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
+
+                    deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
+                                                       usernamepasswd.password(),
+                                                       IpAddress.valueOf(ip),
+                                                       port);
+
+                } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
+                    String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
+                    String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
+                    String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+
+                    deviceInfo = new NetconfDeviceInfo(username,
+                                                       password,
+                                                       IpAddress.valueOf(ip),
+                                                       port,
+                                                       sshkey);
+                } else {
+                    log.error("Unknown device key for device {}", deviceId);
+                }
+                NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
+                netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
+                return netconfDevicedevice;
+            } catch (NullPointerException e) {
+                throw new NetconfException("No Device Key for device " + deviceId, e);
+            }
+        }
+    }
+
+    @Override
+    public void disconnectDevice(DeviceId deviceId, boolean remove) {
+        if (!netconfDeviceMap.containsKey(deviceId)) {
+            log.warn("Device {} is not present", deviceId);
+        } else {
+            stopDevice(deviceId, remove);
+        }
+    }
+
+    private void stopDevice(DeviceId deviceId, boolean remove) {
+        netconfDeviceMap.get(deviceId).disconnect();
+        netconfDeviceMap.remove(deviceId);
+        if (remove) {
+            for (NetconfDeviceListener l : netconfDeviceListeners) {
+                l.deviceRemoved(deviceId);
+            }
+        }
+    }
+
+    @Override
+    public void removeDevice(DeviceId deviceId) {
+        if (!netconfDeviceMap.containsKey(deviceId)) {
+            log.warn("Device {} is not present", deviceId);
+            for (NetconfDeviceListener l : netconfDeviceListeners) {
+                l.deviceRemoved(deviceId);
+            }
+        } else {
+            netconfDeviceMap.remove(deviceId);
+            for (NetconfDeviceListener l : netconfDeviceListeners) {
+                l.deviceRemoved(deviceId);
+            }
+        }
+    }
+
+    private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
+        NetconfDevice netconfDevice = deviceFactory.createNetconfDevice(deviceInfo);
+        netconfDeviceMap.put(deviceInfo.getDeviceId(), netconfDevice);
+        for (NetconfDeviceListener l : netconfDeviceListeners) {
+            l.deviceAdded(deviceInfo.getDeviceId());
+        }
+        return netconfDevice;
+    }
+
+
+    @Override
+    public Map<DeviceId, NetconfDevice> getDevicesMap() {
+        return netconfDeviceMap;
+    }
+
+    @Override
+    public Set<DeviceId> getNetconfDevices() {
+        return netconfDeviceMap.keySet();
+    }
+
+
+    //Device factory for the specific NetconfDeviceImpl
+    private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory {
+
+        @Override
+        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+            return new DefaultNetconfDevice(netconfDeviceInfo);
+        }
+    }
+
+    //Listener for closed session with devices, gets triggered whe devices goes down
+    // or sends the endpattern ]]>]]>
+    private class DeviceDownEventListener implements NetconfDeviceOutputEventListener {
+
+        @Override
+        public void event(NetconfDeviceOutputEvent event) {
+            DeviceId did = event.getDeviceInfo().getDeviceId();
+            if (event.type().equals(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED)) {
+                removeDevice(did);
+            } else if (event.type().equals(NetconfDeviceOutputEvent.Type.SESSION_CLOSED)) {
+                log.info("Trying to reestablish connection with device {}", did);
+                executor.execute(() -> {
+                    try {
+                        netconfDeviceMap.get(did).getSession().checkAndReestablish();
+                        log.info("Connection with device {} was reestablished", did);
+                    } catch (NetconfException e) {
+                        log.error("The SSH connection with device {} couldn't be " +
+                                          "reestablished due to {}. " +
+                                          "Marking the device as unreachable", e.getMessage());
+                        log.debug("Complete exception: ", e);
+                        removeDevice(did);
+                    }
+                });
+            }
+        }
+
+        @Override
+        public boolean isRelevant(NetconfDeviceOutputEvent event) {
+            return getDevicesMap().containsKey(event.getDeviceInfo().getDeviceId());
+        }
+    }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java
new file mode 100644
index 0000000..305b785
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionDelegate.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+
+/**
+ * Entity associated with a NetconfSessionImpl and capable of receiving notifications of
+ * events about the session.
+ *
+ * @deprecated in 1.10.0
+ */
+@Deprecated
+public interface NetconfSessionDelegate {
+
+        /**
+         * Notifies the delegate via the specified event.
+         *
+         * @param event store generated event
+         */
+        void notify(NetconfDeviceOutputEvent event);
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
new file mode 100644
index 0000000..00bbca0
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -0,0 +1,670 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import com.google.common.annotations.Beta;
+import ch.ethz.ssh2.Connection;
+import ch.ethz.ssh2.Session;
+import ch.ethz.ssh2.channel.Channel;
+import com.google.common.base.Preconditions;
+import org.onosproject.netconf.TargetConfig;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+
+/**
+ * Implementation of a NETCONF session to talk to a device.
+ *
+ * @deprecated in 1.10.0
+ */
+@Deprecated
+public class NetconfSessionImpl implements NetconfSession {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(NetconfSessionImpl.class);
+
+    private static final String ENDPATTERN = "]]>]]>";
+    private static final String MESSAGE_ID_STRING = "message-id";
+    private static final String HELLO = "<hello";
+    private static final String NEW_LINE = "\n";
+    private static final String END_OF_RPC_OPEN_TAG = "\">";
+    private static final String EQUAL = "=";
+    private static final String NUMBER_BETWEEN_QUOTES_MATCHER = "\"+([0-9]+)+\"";
+    private static final String RPC_OPEN = "<rpc ";
+    private static final String RPC_CLOSE = "</rpc>";
+    private static final String GET_OPEN = "<get>";
+    private static final String GET_CLOSE = "</get>";
+    private static final String WITH_DEFAULT_OPEN = "<with-defaults ";
+    private static final String WITH_DEFAULT_CLOSE = "</with-defaults>";
+    private static final String DEFAULT_OPERATION_OPEN = "<default-operation>";
+    private static final String DEFAULT_OPERATION_CLOSE = "</default-operation>";
+    private static final String SUBTREE_FILTER_OPEN = "<filter type=\"subtree\">";
+    private static final String SUBTREE_FILTER_CLOSE = "</filter>";
+    private static final String EDIT_CONFIG_OPEN = "<edit-config>";
+    private static final String EDIT_CONFIG_CLOSE = "</edit-config>";
+    private static final String TARGET_OPEN = "<target>";
+    private static final String TARGET_CLOSE = "</target>";
+    private static final String CONFIG_OPEN = "<config xmlns:nc=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
+    private static final String CONFIG_CLOSE = "</config>";
+    private static final String XML_HEADER =
+            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
+    private static final String NETCONF_BASE_NAMESPACE =
+            "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"";
+    private static final String NETCONF_WITH_DEFAULTS_NAMESPACE =
+            "xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults\"";
+    private static final String SUBSCRIPTION_SUBTREE_FILTER_OPEN =
+            "<filter xmlns:base10=\"urn:ietf:params:xml:ns:netconf:base:1.0\" base10:type=\"subtree\">";
+
+    private static Pattern msgIdPattern = Pattern.compile("(message-id=\"[0-9]+\")");
+
+    private final AtomicInteger messageIdInteger = new AtomicInteger(0);
+    private Connection netconfConnection;
+    private NetconfDeviceInfo deviceInfo;
+    private Session sshSession;
+    private boolean connectionActive;
+    private List<String> deviceCapabilities =
+            Collections.singletonList("urn:ietf:params:netconf:base:1.0");
+    private String serverCapabilities;
+    private NetconfStreamHandler streamHandler;
+    private Map<Integer, CompletableFuture<String>> replies;
+    private List<String> errorReplies;
+    private boolean subscriptionConnected = false;
+    private String notificationFilterSchema = null;
+
+
+    public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
+        this.deviceInfo = deviceInfo;
+        this.netconfConnection = null;
+        this.sshSession = null;
+        connectionActive = false;
+        replies = new ConcurrentHashMap<>();
+        errorReplies = new ArrayList<>();
+        startConnection();
+    }
+
+    private void startConnection() throws NetconfException {
+        if (!connectionActive) {
+            netconfConnection = new Connection(deviceInfo.ip().toString(), deviceInfo.port());
+            int connectTimeout = NetconfControllerImpl.netconfConnectTimeout;
+
+            try {
+                netconfConnection.connect(null, 1000 * connectTimeout, 1000 * connectTimeout);
+            } catch (IOException e) {
+                throw new NetconfException("Cannot open a connection with device " + deviceInfo, e);
+            }
+            boolean isAuthenticated;
+            try {
+                if (deviceInfo.getKeyFile() != null && deviceInfo.getKeyFile().canRead()) {
+                    log.debug("Authenticating with key file to device {} with username {}",
+                              deviceInfo.getDeviceId(), deviceInfo.name());
+                    isAuthenticated = netconfConnection.authenticateWithPublicKey(
+                            deviceInfo.name(), deviceInfo.getKeyFile(),
+                            deviceInfo.password().equals("") ? null : deviceInfo.password());
+                } else if (deviceInfo.getKey() != null) {
+                    log.debug("Authenticating with key to device {} with username {}",
+                              deviceInfo.getDeviceId(), deviceInfo.name());
+                    isAuthenticated = netconfConnection.authenticateWithPublicKey(
+                            deviceInfo.name(), deviceInfo.getKey(),
+                            deviceInfo.password().equals("") ? null : deviceInfo.password());
+                } else {
+                    log.debug("Authenticating to device {} with username {} with password",
+                              deviceInfo.getDeviceId(), deviceInfo.name());
+                    isAuthenticated = netconfConnection.authenticateWithPassword(
+                            deviceInfo.name(), deviceInfo.password());
+                }
+            } catch (IOException e) {
+                log.error("Authentication connection to device {} failed",
+                          deviceInfo.getDeviceId(), e);
+                throw new NetconfException("Authentication connection to device " +
+                                                   deviceInfo.getDeviceId() + " failed", e);
+            }
+
+            connectionActive = true;
+            Preconditions.checkArgument(isAuthenticated,
+                                        "Authentication to device %s with username " +
+                                                "%s failed",
+                                        deviceInfo.getDeviceId(), deviceInfo.name());
+            startSshSession();
+        }
+    }
+
+    private void startSshSession() throws NetconfException {
+        try {
+            sshSession = netconfConnection.openSession();
+            sshSession.startSubSystem("netconf");
+            streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+                                                    sshSession.getStderr(), deviceInfo,
+                                                    new NetconfSessionDelegateImpl(),
+                                                    replies);
+            this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
+            sendHello();
+        } catch (IOException e) {
+            log.error("Failed to create ch.ethz.ssh2.Session session {} ", e.getMessage());
+            throw new NetconfException("Failed to create ch.ethz.ssh2.Session session with device" +
+                                               deviceInfo, e);
+        }
+    }
+
+
+    @Beta
+    private void startSubscriptionConnection(String filterSchema) throws NetconfException {
+        if (!serverCapabilities.contains("interleave")) {
+            throw new NetconfException("Device" + deviceInfo + "does not support interleave");
+        }
+        String reply = sendRequest(createSubscriptionString(filterSchema));
+        if (!checkReply(reply)) {
+            throw new NetconfException("Subscription not successful with device "
+                                               + deviceInfo + " with reply " + reply);
+        }
+        subscriptionConnected = true;
+    }
+
+    @Override
+    public void startSubscription() throws NetconfException {
+        if (!subscriptionConnected) {
+            startSubscriptionConnection(null);
+        }
+        streamHandler.setEnableNotifications(true);
+    }
+
+    @Beta
+    @Override
+    public void startSubscription(String filterSchema) throws NetconfException {
+        if (!subscriptionConnected) {
+            notificationFilterSchema = filterSchema;
+            startSubscriptionConnection(filterSchema);
+        }
+        streamHandler.setEnableNotifications(true);
+    }
+
+    @Beta
+    private String createSubscriptionString(String filterSchema) {
+        StringBuilder subscriptionbuffer = new StringBuilder();
+        subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        subscriptionbuffer.append("  <create-subscription\n");
+        subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
+        // FIXME Only subtree filtering supported at the moment.
+        if (filterSchema != null) {
+            subscriptionbuffer.append("    ");
+            subscriptionbuffer.append(SUBSCRIPTION_SUBTREE_FILTER_OPEN).append(NEW_LINE);
+            subscriptionbuffer.append(filterSchema).append(NEW_LINE);
+            subscriptionbuffer.append("    ");
+            subscriptionbuffer.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE);
+        }
+        subscriptionbuffer.append("  </create-subscription>\n");
+        subscriptionbuffer.append("</rpc>\n");
+        subscriptionbuffer.append(ENDPATTERN);
+        return subscriptionbuffer.toString();
+    }
+
+    @Override
+    public void endSubscription() throws NetconfException {
+        if (subscriptionConnected) {
+            streamHandler.setEnableNotifications(false);
+        } else {
+            throw new NetconfException("Subscription does not exist.");
+        }
+    }
+
+    private void sendHello() throws NetconfException {
+        serverCapabilities = sendRequest(createHelloString());
+    }
+
+    private String createHelloString() {
+        StringBuilder hellobuffer = new StringBuilder();
+        hellobuffer.append(XML_HEADER);
+        hellobuffer.append("\n");
+        hellobuffer.append("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        hellobuffer.append("  <capabilities>\n");
+        deviceCapabilities.forEach(
+                cap -> hellobuffer.append("    <capability>")
+                        .append(cap)
+                        .append("</capability>\n"));
+        hellobuffer.append("  </capabilities>\n");
+        hellobuffer.append("</hello>\n");
+        hellobuffer.append(ENDPATTERN);
+        return hellobuffer.toString();
+
+    }
+
+    @Override
+    public void checkAndReestablish() throws NetconfException {
+        if (sshSession.getState() != Channel.STATE_OPEN) {
+            try {
+                log.debug("Trying to reopen the Sesion with {}", deviceInfo.getDeviceId());
+                startSshSession();
+            } catch (IOException | IllegalStateException e) {
+                log.debug("Trying to reopen the Connection with {}", deviceInfo.getDeviceId());
+                try {
+                    connectionActive = false;
+                    replies.clear();
+                    messageIdInteger.set(0);
+                    startConnection();
+                    if (subscriptionConnected) {
+                        log.debug("Restarting subscription with {}", deviceInfo.getDeviceId());
+                        subscriptionConnected = false;
+                        startSubscription(notificationFilterSchema);
+                    }
+                } catch (IOException e2) {
+                    log.error("No connection {} for device {}", netconfConnection, e.getMessage());
+                    throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String requestSync(String request) throws NetconfException {
+        if (!request.contains(ENDPATTERN)) {
+            request = request + NEW_LINE + ENDPATTERN;
+        }
+        String reply = sendRequest(request);
+        checkReply(reply);
+        return reply;
+    }
+
+    @Override
+    @Deprecated
+    public CompletableFuture<String> request(String request) {
+        return streamHandler.sendMessage(request);
+    }
+
+    private CompletableFuture<String> request(String request, int messageId) {
+        return streamHandler.sendMessage(request, messageId);
+    }
+
+    private String sendRequest(String request) throws NetconfException {
+        checkAndReestablish();
+        final int messageId = messageIdInteger.getAndIncrement();
+        request = formatRequestMessageId(request, messageId);
+        request = formatXmlHeader(request);
+        CompletableFuture<String> futureReply = request(request, messageId);
+        int replyTimeout = NetconfControllerImpl.netconfReplyTimeout;
+        String rp;
+        try {
+            rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
+            replies.remove(messageId);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new NetconfException("No matching reply for request " + request, e);
+        }
+        log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
+        return rp.trim();
+    }
+
+    private String formatRequestMessageId(String request, int messageId) {
+        if (request.contains(MESSAGE_ID_STRING)) {
+            //FIXME if application provides his own counting of messages this fails that count
+            request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
+                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
+        } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
+            //FIXME find out a better way to enforce the presence of message-id
+            request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
+                    + messageId + "\"" + ">");
+        }
+        return request;
+    }
+
+    private String formatXmlHeader(String request) {
+        if (!request.contains(XML_HEADER)) {
+            //FIXME if application provieds his own XML header of different type there is a clash
+            request = XML_HEADER + "\n" + request;
+        }
+        return request;
+    }
+
+    @Override
+    public String doWrappedRpc(String request) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append(request);
+        rpc.append(RPC_CLOSE).append(NEW_LINE);
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
+        checkReply(reply);
+        return reply;
+    }
+
+    @Override
+    public String get(String request) throws NetconfException {
+        return requestSync(request);
+    }
+
+    @Override
+    public String get(String filterSchema, String withDefaultsMode) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append(GET_OPEN).append(NEW_LINE);
+        if (filterSchema != null) {
+            rpc.append(SUBTREE_FILTER_OPEN).append(NEW_LINE);
+            rpc.append(filterSchema).append(NEW_LINE);
+            rpc.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE);
+        }
+        if (withDefaultsMode != null) {
+            rpc.append(WITH_DEFAULT_OPEN).append(NETCONF_WITH_DEFAULTS_NAMESPACE).append(">");
+            rpc.append(withDefaultsMode).append(WITH_DEFAULT_CLOSE).append(NEW_LINE);
+        }
+        rpc.append(GET_CLOSE).append(NEW_LINE);
+        rpc.append(RPC_CLOSE).append(NEW_LINE);
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
+        checkReply(reply);
+        return reply;
+    }
+
+    @Override
+    public String getConfig(TargetConfig netconfTargetConfig) throws NetconfException {
+        return getConfig(netconfTargetConfig, null);
+    }
+
+    @Override
+    public String getConfig(String netconfTargetConfig) throws NetconfException {
+        return getConfig(TargetConfig.toTargetConfig(netconfTargetConfig));
+    }
+
+    @Override
+    public String getConfig(String netconfTargetConfig, String configurationFilterSchema) throws NetconfException {
+        return getConfig(TargetConfig.toTargetConfig(netconfTargetConfig), configurationFilterSchema);
+    }
+
+    @Override
+    public String getConfig(TargetConfig netconfTargetConfig, String configurationSchema) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc ");
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        rpc.append("<get-config>\n");
+        rpc.append("<source>\n");
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append("</source>");
+        if (configurationSchema != null) {
+            rpc.append("<filter type=\"subtree\">\n");
+            rpc.append(configurationSchema).append("\n");
+            rpc.append("</filter>\n");
+        }
+        rpc.append("</get-config>\n");
+        rpc.append("</rpc>\n");
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
+        return checkReply(reply) ? reply : "ERROR " + reply;
+    }
+
+    @Override
+    public boolean editConfig(String newConfiguration) throws NetconfException {
+        newConfiguration = newConfiguration + ENDPATTERN;
+        return checkReply(sendRequest(newConfiguration));
+    }
+
+    @Override
+    public boolean editConfig(String netconfTargetConfig, String mode, String newConfiguration)
+            throws NetconfException {
+        return editConfig(TargetConfig.toTargetConfig(netconfTargetConfig), mode, newConfiguration);
+    }
+
+    @Override
+    public boolean editConfig(TargetConfig netconfTargetConfig, String mode, String newConfiguration)
+            throws NetconfException {
+        newConfiguration = newConfiguration.trim();
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append(EDIT_CONFIG_OPEN).append("\n");
+        rpc.append(TARGET_OPEN);
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append(TARGET_CLOSE).append("\n");
+        if (mode != null) {
+            rpc.append(DEFAULT_OPERATION_OPEN);
+            rpc.append(mode);
+            rpc.append(DEFAULT_OPERATION_CLOSE).append("\n");
+        }
+        rpc.append(CONFIG_OPEN).append("\n");
+        rpc.append(newConfiguration);
+        rpc.append(CONFIG_CLOSE).append("\n");
+        rpc.append(EDIT_CONFIG_CLOSE).append("\n");
+        rpc.append(RPC_CLOSE);
+        rpc.append(ENDPATTERN);
+        log.debug(rpc.toString());
+        String reply = sendRequest(rpc.toString());
+        return checkReply(reply);
+    }
+
+    @Override
+    public boolean copyConfig(String netconfTargetConfig, String newConfiguration) throws NetconfException {
+        return copyConfig(TargetConfig.toTargetConfig(netconfTargetConfig), newConfiguration);
+    }
+
+    @Override
+    public boolean copyConfig(TargetConfig netconfTargetConfig, String newConfiguration)
+            throws NetconfException {
+        newConfiguration = newConfiguration.trim();
+        if (!newConfiguration.startsWith("<config>")) {
+            newConfiguration = "<config>" + newConfiguration
+                    + "</config>";
+        }
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append("<copy-config>");
+        rpc.append("<target>");
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append("</target>");
+        rpc.append("<source>");
+        rpc.append(newConfiguration);
+        rpc.append("</source>");
+        rpc.append("</copy-config>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
+    }
+
+    @Override
+    public boolean deleteConfig(String netconfTargetConfig) throws NetconfException {
+        return deleteConfig(TargetConfig.toTargetConfig(netconfTargetConfig));
+    }
+
+    @Override
+    public boolean deleteConfig(TargetConfig netconfTargetConfig) throws NetconfException {
+        if (netconfTargetConfig.equals(TargetConfig.RUNNING)) {
+            log.warn("Target configuration for delete operation can't be \"running\"",
+                     netconfTargetConfig);
+            return false;
+        }
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc>");
+        rpc.append("<delete-config>");
+        rpc.append("<target>");
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append("</target>");
+        rpc.append("</delete-config>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
+    }
+
+    @Override
+    public boolean lock(String configType) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        rpc.append("<lock>");
+        rpc.append("<target>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
+        rpc.append("</target>");
+        rpc.append("</lock>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        String lockReply = sendRequest(rpc.toString());
+        return checkReply(lockReply);
+    }
+
+    @Override
+    public boolean unlock(String configType) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        rpc.append("<unlock>");
+        rpc.append("<target>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
+        rpc.append("</target>");
+        rpc.append("</unlock>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        String unlockReply = sendRequest(rpc.toString());
+        return checkReply(unlockReply);
+    }
+
+    @Override
+    public boolean lock() throws NetconfException {
+        return lock("running");
+    }
+
+    @Override
+    public boolean unlock() throws NetconfException {
+        return unlock("running");
+    }
+
+    @Override
+    public boolean close() throws NetconfException {
+        return close(false);
+    }
+
+    private boolean close(boolean force) throws NetconfException {
+        StringBuilder rpc = new StringBuilder();
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">");
+        if (force) {
+            rpc.append("<kill-session/>");
+        } else {
+            rpc.append("<close-session/>");
+        }
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString())) || close(true);
+    }
+
+    @Override
+    public String getSessionId() {
+        if (serverCapabilities.contains("<session-id>")) {
+            String[] outer = serverCapabilities.split("<session-id>");
+            Preconditions.checkArgument(outer.length != 1,
+                                        "Error in retrieving the session id");
+            String[] value = outer[1].split("</session-id>");
+            Preconditions.checkArgument(value.length != 1,
+                                        "Error in retrieving the session id");
+            return value[0];
+        } else {
+            return String.valueOf(-1);
+        }
+    }
+
+    @Override
+    public String getServerCapabilities() {
+        return serverCapabilities;
+    }
+
+    @Override
+    public void setDeviceCapabilities(List<String> capabilities) {
+        deviceCapabilities = capabilities;
+    }
+
+    @Override
+    public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        streamHandler.addDeviceEventListener(listener);
+    }
+
+    @Override
+    public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        streamHandler.removeDeviceEventListener(listener);
+    }
+
+    private boolean checkReply(String reply) throws NetconfException {
+        if (reply != null) {
+            if (!reply.contains("<rpc-error>")) {
+                log.debug("Device {} sent reply {}", deviceInfo, reply);
+                return true;
+            } else if (reply.contains("<ok/>")
+                    || (reply.contains("<rpc-error>")
+                    && reply.contains("warning"))) {
+                log.debug("Device {} sent reply {}", deviceInfo, reply);
+                return true;
+            }
+        }
+        log.warn("Device {} has error in reply {}", deviceInfo, reply);
+        return false;
+    }
+
+    public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
+
+        @Override
+        public void notify(NetconfDeviceOutputEvent event) {
+            Optional<Integer> messageId = event.getMessageID();
+            log.debug("messageID {}, waiting replies messageIDs {}", messageId,
+                      replies.keySet());
+            if (!messageId.isPresent()) {
+                errorReplies.add(event.getMessagePayload());
+                log.error("Device {} sent error reply {}",
+                          event.getDeviceInfo(), event.getMessagePayload());
+                return;
+            }
+            CompletableFuture<String> completedReply =
+                    replies.get(messageId.get());
+            if (completedReply != null) {
+                completedReply.complete(event.getMessagePayload());
+            }
+        }
+    }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
new file mode 100644
index 0000000..2f39711
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface to represent an objects that does all the IO on a NETCONF session
+ * with a device.
+ *
+ * @deprecated in 1.10.0
+ */
+@Deprecated
+public interface NetconfStreamHandler {
+    /**
+     * Sends the request on the stream that is used to communicate to and from the device.
+     *
+     * If this request does not contain a messageId then this will throw a NoSuchElementException
+     *
+     * @param request request to send to the physical device
+     * @return a CompletableFuture of type String that will contain the response for the request.
+     * @deprecated - use method with messageId parameter instead
+     */
+    @Deprecated
+    CompletableFuture<String> sendMessage(String request);
+
+    /**
+     * Sends the request on the stream that is used to communicate to and from the device.
+     *
+     * @param request request to send to the physical device
+     * @param messageId The identifier of the message - should be unique for the session
+     * @return a CompletableFuture of type String that will contain the response for the request.
+     */
+    CompletableFuture<String> sendMessage(String request, int messageId);
+
+    /**
+     * Adds a listener for netconf events on the handled stream.
+     *
+     * @param listener Netconf device event listener
+     */
+    void addDeviceEventListener(NetconfDeviceOutputEventListener listener);
+
+    /**
+     * Removes a listener for netconf events on the handled stream.
+     *
+     * @param listener Netconf device event listener
+     */
+    void removeDeviceEventListener(NetconfDeviceOutputEventListener listener);
+
+    @Beta
+    /**
+     * Sets instance variable that when true allows receipt of notifications.
+     *
+     * @param enableNotifications if true, allows action based off notifications
+     *                             else, stops action based off notifications
+     */
+    void setEnableNotifications(boolean enableNotifications);
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
new file mode 100644
index 0000000..ec6346a
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Thread that gets spawned each time a session is established and handles all the input
+ * and output from the session's streams to and from the NETCONF device the session is
+ * established with.
+ *
+ * @deprecated in 1.10.0
+ */
+@Deprecated
+public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(NetconfStreamThread.class);
+    private static final String HELLO = "<hello";
+    private static final String END_PATTERN = "]]>]]>";
+    private static final String RPC_REPLY = "rpc-reply";
+    private static final String RPC_ERROR = "rpc-error";
+    private static final String NOTIFICATION_LABEL = "<notification";
+    private static final String MESSAGE_ID = "message-id=";
+    private static final Pattern MSGID_PATTERN = Pattern.compile(MESSAGE_ID + "\"(\\d+)\"");
+
+    private PrintWriter outputStream;
+    private final InputStream err;
+    private final InputStream in;
+    private NetconfDeviceInfo netconfDeviceInfo;
+    private NetconfSessionDelegate sessionDelegate;
+    private NetconfMessageState state;
+    private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
+            = Lists.newCopyOnWriteArrayList();
+    private boolean enableNotifications = true;
+    private Map<Integer, CompletableFuture<String>> replies;
+
+    public NetconfStreamThread(final InputStream in, final OutputStream out,
+                               final InputStream err, NetconfDeviceInfo deviceInfo,
+                               NetconfSessionDelegate delegate,
+                               Map<Integer, CompletableFuture<String>> replies) {
+        this.in = in;
+        this.err = err;
+        outputStream = new PrintWriter(out);
+        netconfDeviceInfo = deviceInfo;
+        state = NetconfMessageState.NO_MATCHING_PATTERN;
+        sessionDelegate = delegate;
+        this.replies = replies;
+        log.debug("Stream thread for device {} session started", deviceInfo);
+        start();
+    }
+
+    @Override
+    public CompletableFuture<String> sendMessage(String request) {
+        Optional<Integer> messageId = getMsgId(request);
+        return sendMessage(request, messageId.get());
+    }
+
+    @Override
+    public CompletableFuture<String> sendMessage(String request, int messageId) {
+        log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
+        CompletableFuture<String> cf = new CompletableFuture<>();
+        replies.put(messageId, cf);
+
+        synchronized (outputStream) {
+            outputStream.print(request);
+            outputStream.flush();
+        }
+
+        return cf;
+    }
+
+    public enum NetconfMessageState {
+
+        NO_MATCHING_PATTERN {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return FIRST_BRACKET;
+                } else {
+                    return this;
+                }
+            }
+        },
+        FIRST_BRACKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return SECOND_BRACKET;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        SECOND_BRACKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == '>') {
+                    return FIRST_BIGGER;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        FIRST_BIGGER {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return THIRD_BRACKET;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        THIRD_BRACKET {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == ']') {
+                    return ENDING_BIGGER;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        ENDING_BIGGER {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                if (c == '>') {
+                    return END_PATTERN;
+                } else {
+                    return NO_MATCHING_PATTERN;
+                }
+            }
+        },
+        END_PATTERN {
+            @Override
+            NetconfMessageState evaluateChar(char c) {
+                return NO_MATCHING_PATTERN;
+            }
+        };
+
+        abstract NetconfMessageState evaluateChar(char c);
+    }
+
+    @Override
+    public void run() {
+        BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
+            try {
+                boolean socketClosed = false;
+                StringBuilder deviceReplyBuilder = new StringBuilder();
+                while (!socketClosed) {
+                    int cInt = bufferReader.read();
+                    if (cInt == -1) {
+                        log.debug("Netconf device {}  sent error char in session," +
+                                          " will need to be reopend", netconfDeviceInfo);
+                        NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
+                                null, null, Optional.of(-1), netconfDeviceInfo);
+                        netconfDeviceEventListeners.forEach(
+                                listener -> listener.event(event));
+                        socketClosed = true;
+                        log.debug("Netconf device {} ERROR cInt == -1 socketClosed = true", netconfDeviceInfo);
+                    }
+                    char c = (char) cInt;
+                    state = state.evaluateChar(c);
+                    deviceReplyBuilder.append(c);
+                    if (state == NetconfMessageState.END_PATTERN) {
+                        String deviceReply = deviceReplyBuilder.toString();
+                        if (deviceReply.equals(END_PATTERN)) {
+                            socketClosed = true;
+                            log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}",
+                                     netconfDeviceInfo, deviceReply);
+                            NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                    NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
+                                    null, null, Optional.of(-1), netconfDeviceInfo);
+                            netconfDeviceEventListeners.forEach(
+                                    listener -> listener.event(event));
+                            this.interrupt();
+                        } else {
+                            deviceReply = deviceReply.replace(END_PATTERN, "");
+                            if (deviceReply.contains(RPC_REPLY) ||
+                                    deviceReply.contains(RPC_ERROR) ||
+                                    deviceReply.contains(HELLO)) {
+                                log.debug("Netconf device {} sessionDelegate.notify() DEVICE_REPLY {} {}",
+                                    netconfDeviceInfo, getMsgId(deviceReply), deviceReply);
+                                NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                                        NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
+                                        null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
+                                sessionDelegate.notify(event);
+                                netconfDeviceEventListeners.forEach(
+                                        listener -> listener.event(event));
+                            } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
+                                log.debug("Netconf device {} DEVICE_NOTIFICATION {} {} {}",
+                                         netconfDeviceInfo, enableNotifications,
+                                         getMsgId(deviceReply), deviceReply);
+                                if (enableNotifications) {
+                                    log.debug("dispatching to {} listeners", netconfDeviceEventListeners.size());
+                                    final String finalDeviceReply = deviceReply;
+                                    netconfDeviceEventListeners.forEach(
+                                            listener -> listener.event(new NetconfDeviceOutputEvent(
+                                                    NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
+                                                    null, finalDeviceReply, getMsgId(finalDeviceReply),
+                                                    netconfDeviceInfo)));
+                                }
+                            } else {
+                                log.debug("Error on reply from device {} {}", netconfDeviceInfo, deviceReply);
+                            }
+                            deviceReplyBuilder.setLength(0);
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e);
+                throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
+                                                                        netconfDeviceInfo, e));
+                //TODO should we send a socket closed message to listeners ?
+            }
+    }
+
+    protected static Optional<Integer> getMsgId(String reply) {
+        Matcher matcher = MSGID_PATTERN.matcher(reply);
+        if (matcher.find()) {
+            Integer messageId = Integer.parseInt(matcher.group(1));
+            Preconditions.checkNotNull(messageId, "Error in retrieving the message id");
+            return Optional.of(messageId);
+        }
+        if (reply.contains(HELLO)) {
+            return Optional.of(0);
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
+        if (!netconfDeviceEventListeners.contains(listener)) {
+            netconfDeviceEventListeners.add(listener);
+        }
+    }
+
+    @Override
+    public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
+        netconfDeviceEventListeners.remove(listener);
+    }
+
+    @Override
+    public void setEnableNotifications(boolean enableNotifications) {
+        this.enableNotifications = enableNotifications;
+    }
+}