[ONOS-6376] Netconf ssh connection through Apache Mina library

Change-Id: If69fd89afe3082debc3c28a06debfed53426635c
diff --git a/protocols/netconf/ctl/BUCK b/protocols/netconf/ctl/BUCK
index 9ed9e0c..dbd6e97 100644
--- a/protocols/netconf/ctl/BUCK
+++ b/protocols/netconf/ctl/BUCK
@@ -4,13 +4,13 @@
     '//protocols/netconf/api:onos-protocols-netconf-api',
     '//cli:onos-cli',
     '//lib:org.apache.karaf.shell.console',
+    '//lib:sshd-core',
 ]
 
 TEST_DEPS = [
     '//lib:TEST_ADAPTERS',
     '//utils/osgi:onlab-osgi-tests',
     '//core/api:onos-api-tests',
-    '//lib:sshd-core'
 ]
 
 osgi_jar_with_tests (
diff --git a/protocols/netconf/ctl/pom.xml b/protocols/netconf/ctl/pom.xml
index 40fa0a2..d324191 100644
--- a/protocols/netconf/ctl/pom.xml
+++ b/protocols/netconf/ctl/pom.xml
@@ -64,8 +64,7 @@
         <dependency>
             <groupId>org.apache.sshd</groupId>
             <artifactId>sshd-core</artifactId>
-            <version>0.14.0</version>
-            <scope>test</scope>
+            <version>1.4.0</version>
         </dependency>
 
         <dependency>
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
index c29b895..3836f97 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
@@ -36,7 +36,7 @@
 
     private NetconfDeviceInfo netconfDeviceInfo;
     private boolean deviceState = true;
-    protected NetconfSessionFactory sessionFactory = new SshNetconfSessionFactory();
+    private final NetconfSessionFactory sessionFactory;
     private NetconfSession netconfSession;
 
     // will block until hello RPC handshake completes
@@ -51,6 +51,31 @@
     public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo)
             throws NetconfException {
         netconfDeviceInfo = deviceInfo;
+        sessionFactory = new NetconfSessionMinaImpl.MinaSshNetconfSessionFactory();
+        try {
+            netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+        } catch (IOException e) {
+            deviceState = false;
+            throw new NetconfException("Cannot create connection and session for device " +
+                                               deviceInfo, e);
+        }
+    }
+
+    // will block until hello RPC handshake completes
+    /**
+     * Creates a new default NETCONF device with the information provided.
+     * The device gets created only if no exception is thrown while connecting to
+     * it and establishing the NETCONF session.
+     *
+     * @param deviceInfo information about the device to be created.
+     * @param factory the factory used to create the session
+     * @throws NetconfException if there are problems in creating or establishing
+     * the underlying NETCONF connection and session.
+     */
+    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo, NetconfSessionFactory factory)
+            throws NetconfException {
+        netconfDeviceInfo = deviceInfo;
+        sessionFactory = factory;
         try {
             netconfSession = sessionFactory.createNetconfSession(deviceInfo);
         } catch (IOException e) {
@@ -85,12 +110,5 @@
         return netconfDeviceInfo;
     }
 
-    public class SshNetconfSessionFactory implements NetconfSessionFactory {
-
-        @Override
-        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
-            return new NetconfSessionImpl(netconfDeviceInfo);
-        }
-    }
 
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index c03a6a4..428c9d2 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -65,6 +65,9 @@
 @Component(immediate = true)
 @Service
 public class NetconfControllerImpl implements NetconfController {
+
+    private static final String ETHZ_SSH2 = "ethz-ssh2";
+
     private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 5;
     private static final String PROP_NETCONF_CONNECT_TIMEOUT = "netconfConnectTimeout";
     @Property(name = PROP_NETCONF_CONNECT_TIMEOUT, intValue = DEFAULT_CONNECT_TIMEOUT_SECONDS,
@@ -77,6 +80,12 @@
             label = "Time (in seconds) waiting for a NetConf reply")
     protected static int netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS;
 
+    private static final String SSH_LIBRARY = "sshLibrary";
+    private static final String APACHE_MINA = "apache_mina";
+    @Property(name = SSH_LIBRARY, value = APACHE_MINA,
+            label = "Ssh Llbrary instead of Apache Mina (i.e. ethz-ssh2")
+    protected static String sshLibrary = APACHE_MINA;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
@@ -98,7 +107,7 @@
 
     protected final ExecutorService executor =
             Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
-                                                           "connection-reopen-%d", log));
+                                                         "connection-reopen-%d", log));
 
     @Activate
     public void activate(ComponentContext context) {
@@ -109,7 +118,12 @@
 
     @Deactivate
     public void deactivate() {
+        netconfDeviceMap.values().forEach(device -> {
+            device.getSession().removeDeviceOutputListener(downListener);
+            device.disconnect();
+        });
         cfgService.unregisterProperties(getClass(), false);
+        netconfDeviceListeners.clear();
         netconfDeviceMap.clear();
         log.info("Stopped");
     }
@@ -119,6 +133,7 @@
         if (context == null) {
             netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS;
             netconfConnectTimeout = DEFAULT_CONNECT_TIMEOUT_SECONDS;
+            sshLibrary = APACHE_MINA;
             log.info("No component configuration");
             return;
         }
@@ -127,6 +142,7 @@
 
         int newNetconfReplyTimeout;
         int newNetconfConnectTimeout;
+        String newSshLibrary;
         try {
             String s = get(properties, PROP_NETCONF_REPLY_TIMEOUT);
             newNetconfReplyTimeout = isNullOrEmpty(s) ?
@@ -136,6 +152,8 @@
             newNetconfConnectTimeout = isNullOrEmpty(s) ?
                     netconfConnectTimeout : Integer.parseInt(s.trim());
 
+            newSshLibrary = get(properties, SSH_LIBRARY);
+
         } catch (NumberFormatException e) {
             log.warn("Component configuration had invalid value", e);
             return;
@@ -151,8 +169,11 @@
 
         netconfReplyTimeout = newNetconfReplyTimeout;
         netconfConnectTimeout = newNetconfConnectTimeout;
-        log.info("Settings: {} = {}, {} = {}",
-                 PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout);
+        sshLibrary = newSshLibrary;
+        log.info("Settings: {} = {}, {} = {}, {} = {}",
+                 PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout,
+                 PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout,
+                 SSH_LIBRARY, sshLibrary);
     }
 
     @Override
@@ -302,7 +323,12 @@
     private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory {
 
         @Override
-        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo)
+                throws NetconfException {
+            if (sshLibrary.equals(ETHZ_SSH2)) {
+                return new DefaultNetconfDevice(netconfDeviceInfo,
+                                                new NetconfSessionImpl.SshNetconfSessionFactory());
+            }
             return new DefaultNetconfDevice(netconfDeviceInfo);
         }
     }
@@ -320,8 +346,14 @@
                 log.info("Trying to reestablish connection with device {}", did);
                 executor.execute(() -> {
                     try {
-                        netconfDeviceMap.get(did).getSession().checkAndReestablish();
-                        log.info("Connection with device {} was reestablished", did);
+                        NetconfDevice device = netconfDeviceMap.get(did);
+                        if (device != null) {
+                            device.getSession().checkAndReestablish();
+                            log.info("Connection with device {} was reestablished", did);
+                        } else {
+                            log.warn("The device {} is not in the system", did);
+                        }
+
                     } catch (NetconfException e) {
                         log.error("The SSH connection with device {} couldn't be " +
                                           "reestablished due to {}. " +
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
index c0fcce3..b808612 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
@@ -24,6 +24,7 @@
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import org.onosproject.netconf.NetconfSessionFactory;
 import org.onosproject.netconf.TargetConfig;
 import org.onosproject.netconf.FilteringNetconfDeviceOutputEventListener;
 import org.onosproject.netconf.NetconfDeviceInfo;
@@ -104,7 +105,7 @@
     private static final Pattern SESSION_ID_REGEX_PATTERN = Pattern.compile(SESSION_ID_REGEX);
 
     private String sessionID;
-    private final AtomicInteger messageIdInteger = new AtomicInteger(0);
+    private final AtomicInteger messageIdInteger = new AtomicInteger(1);
     private Connection netconfConnection;
     protected final NetconfDeviceInfo deviceInfo;
     private Session sshSession;
@@ -288,7 +289,7 @@
     }
 
     private void sendHello() throws NetconfException {
-        serverHelloResponseOld = sendRequest(createHelloString());
+        serverHelloResponseOld = sendRequest(createHelloString(), true);
         Matcher capabilityMatcher = CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponseOld);
         while (capabilityMatcher.find()) {
             deviceCapabilities.add(capabilityMatcher.group(1));
@@ -332,7 +333,6 @@
                 try {
                     connectionActive = false;
                     replies.clear();
-                    messageIdInteger.set(0);
                     startConnection();
                     if (subscriptionConnected) {
                         log.debug("Restarting subscription with {}", deviceInfo.getDeviceId());
@@ -368,8 +368,15 @@
     }
 
     private String sendRequest(String request) throws NetconfException {
+        return sendRequest(request, false);
+    }
+
+    private String sendRequest(String request, boolean isHello) throws NetconfException {
         checkAndReestablish();
-        final int messageId = messageIdInteger.getAndIncrement();
+        int messageId = -1;
+        if (!isHello) {
+            messageId = messageIdInteger.getAndIncrement();
+        }
         request = formatRequestMessageId(request, messageId);
         request = formatXmlHeader(request);
         CompletableFuture<String> futureReply = request(request, messageId);
@@ -784,4 +791,12 @@
             }
         }
     }
+
+    public static class SshNetconfSessionFactory implements NetconfSessionFactory {
+
+        @Override
+        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+            return new NetconfSessionImpl(netconfDeviceInfo);
+        }
+    }
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
new file mode 100644
index 0000000..cdbde45
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -0,0 +1,838 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl.impl;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.future.ConnectFuture;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEvent.Type;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfSession;
+import org.onosproject.netconf.NetconfSessionFactory;
+import org.onosproject.netconf.TargetConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyFactory;
+import java.security.KeyPair;
+import java.security.NoSuchAlgorithmException;
+import java.security.PublicKey;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.X509EncodedKeySpec;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implementation of a NETCONF session to talk to a device.
+ */
+public class NetconfSessionMinaImpl implements NetconfSession {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(NetconfSessionMinaImpl.class);
+
+    private static final String ENDPATTERN = "]]>]]>";
+    private static final String MESSAGE_ID_STRING = "message-id";
+    private static final String HELLO = "<hello";
+    private static final String NEW_LINE = "\n";
+    private static final String END_OF_RPC_OPEN_TAG = "\">";
+    private static final String EQUAL = "=";
+    private static final String NUMBER_BETWEEN_QUOTES_MATCHER = "\"+([0-9]+)+\"";
+    private static final String RPC_OPEN = "<rpc ";
+    private static final String RPC_CLOSE = "</rpc>";
+    private static final String GET_OPEN = "<get>";
+    private static final String GET_CLOSE = "</get>";
+    private static final String WITH_DEFAULT_OPEN = "<with-defaults ";
+    private static final String WITH_DEFAULT_CLOSE = "</with-defaults>";
+    private static final String DEFAULT_OPERATION_OPEN = "<default-operation>";
+    private static final String DEFAULT_OPERATION_CLOSE = "</default-operation>";
+    private static final String SUBTREE_FILTER_OPEN = "<filter type=\"subtree\">";
+    private static final String SUBTREE_FILTER_CLOSE = "</filter>";
+    private static final String EDIT_CONFIG_OPEN = "<edit-config>";
+    private static final String EDIT_CONFIG_CLOSE = "</edit-config>";
+    private static final String TARGET_OPEN = "<target>";
+    private static final String TARGET_CLOSE = "</target>";
+    private static final String CONFIG_OPEN = "<config xmlns:nc=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
+    private static final String CONFIG_CLOSE = "</config>";
+    private static final String XML_HEADER =
+            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
+    private static final String NETCONF_BASE_NAMESPACE =
+            "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"";
+    private static final String NETCONF_WITH_DEFAULTS_NAMESPACE =
+            "xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults\"";
+    private static final String SUBSCRIPTION_SUBTREE_FILTER_OPEN =
+            "<filter xmlns:base10=\"urn:ietf:params:xml:ns:netconf:base:1.0\" base10:type=\"subtree\">";
+
+    private static final String INTERLEAVE_CAPABILITY_STRING = "urn:ietf:params:netconf:capability:interleave:1.0";
+
+    private static final String CAPABILITY_REGEX = "<capability>\\s*(.*?)\\s*</capability>";
+    private static final Pattern CAPABILITY_REGEX_PATTERN = Pattern.compile(CAPABILITY_REGEX);
+
+    private static final String SESSION_ID_REGEX = "<session-id>\\s*(.*?)\\s*</session-id>";
+    private static final Pattern SESSION_ID_REGEX_PATTERN = Pattern.compile(SESSION_ID_REGEX);
+    private static final String RSA = "RSA";
+    private static final String DSA = "DSA";
+
+    private String sessionID;
+    private final AtomicInteger messageIdInteger = new AtomicInteger(1);
+    protected final NetconfDeviceInfo deviceInfo;
+    private Iterable<String> onosCapabilities =
+            Collections.singletonList("urn:ietf:params:netconf:base:1.0");
+
+    /* NOTE: the "serverHelloResponseOld" is deprecated in 1.10.0 and should eventually be removed */
+    @Deprecated
+    private String serverHelloResponseOld;
+    private final Set<String> deviceCapabilities = new LinkedHashSet<>();
+    private NetconfStreamHandler streamHandler;
+    private Map<Integer, CompletableFuture<String>> replies;
+    private List<String> errorReplies;
+    private boolean subscriptionConnected = false;
+    private String notificationFilterSchema = null;
+
+    private final Collection<NetconfDeviceOutputEventListener> primaryListeners =
+            new CopyOnWriteArrayList<>();
+    private final Collection<NetconfSession> children =
+            new CopyOnWriteArrayList<>();
+
+
+    private ClientChannel channel = null;
+    private ClientSession session = null;
+    private SshClient client = null;
+
+
+    public NetconfSessionMinaImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
+        this.deviceInfo = deviceInfo;
+        replies = new ConcurrentHashMap<>();
+        errorReplies = new ArrayList<>();
+        startConnection();
+    }
+
+    private void startConnection() throws NetconfException {
+        try {
+            startClient();
+        } catch (IOException e) {
+            throw new NetconfException("Failed to establish SSH with device " + deviceInfo, e);
+        }
+    }
+
+    private void startClient() throws IOException {
+        client = SshClient.setUpDefaultClient();
+        client.start();
+        client.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+        startSession();
+    }
+
+    private void startSession() throws IOException {
+        final ConnectFuture connectFuture;
+        connectFuture = client.connect(deviceInfo.name(),
+                                       deviceInfo.ip().toString(),
+                                       deviceInfo.port())
+                .verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS);
+        session = connectFuture.getSession();
+        //Using the device ssh key if possible
+        if (deviceInfo.getKey() != null) {
+            ByteBuffer buf = StandardCharsets.UTF_8.encode(CharBuffer.wrap(deviceInfo.getKey()));
+            byte[] byteKey = new byte[buf.limit()];
+            buf.get(byteKey);
+            PublicKey key;
+            try {
+                key = getPublicKey(byteKey, RSA);
+            } catch (NoSuchAlgorithmException | InvalidKeySpecException e) {
+                try {
+                    key = getPublicKey(byteKey, DSA);
+                } catch (NoSuchAlgorithmException | InvalidKeySpecException e1) {
+                    throw new NetconfException("Failed to authenticate session with device " +
+                                                       deviceInfo + "check key to be the " +
+                                                       "proper DSA or RSA key", e1);
+                }
+            }
+            //privateKye can set tu null because is not used by the method.
+            session.addPublicKeyIdentity(new KeyPair(key, null));
+        } else {
+            session.addPasswordIdentity(deviceInfo.password());
+        }
+        session.auth().verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS);
+        Set<ClientSession.ClientSessionEvent> event = session.waitFor(
+                ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
+                                ClientSession.ClientSessionEvent.CLOSED,
+                                ClientSession.ClientSessionEvent.AUTHED), 0);
+
+        if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
+            log.debug("Session closed {} {}", event, session.isClosed());
+            throw new NetconfException("Failed to authenticate session with device " +
+                                               deviceInfo + "check the user/pwd or key");
+        }
+        openChannel();
+    }
+
+    private PublicKey getPublicKey(byte[] keyBytes, String type)
+            throws NoSuchAlgorithmException, InvalidKeySpecException {
+
+        X509EncodedKeySpec spec =
+                new X509EncodedKeySpec(keyBytes);
+        KeyFactory kf = KeyFactory.getInstance(type);
+        return kf.generatePublic(spec);
+    }
+
+    private void openChannel() throws IOException {
+        channel = session.createSubsystemChannel("netconf");
+        OpenFuture channelFuture = channel.open();
+        if (channelFuture.await(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS)) {
+            if (channelFuture.isOpened()) {
+                streamHandler = new NetconfStreamThread(channel.getInvertedOut(), channel.getInvertedIn(),
+                                                        channel.getInvertedErr(), deviceInfo,
+                                                        new NetconfSessionDelegateImpl(), replies);
+            } else {
+                throw new NetconfException("Failed to open channel with device " +
+                                                   deviceInfo);
+            }
+            sendHello();
+        }
+    }
+
+
+    @Beta
+    protected void startSubscriptionStream(String filterSchema) throws NetconfException {
+        boolean openNewSession = false;
+        if (!deviceCapabilities.contains(INTERLEAVE_CAPABILITY_STRING)) {
+            log.info("Device {} doesn't support interleave, creating child session", deviceInfo);
+            openNewSession = true;
+
+        } else if (subscriptionConnected &&
+                notificationFilterSchema != null &&
+                !Objects.equal(filterSchema, notificationFilterSchema)) {
+            // interleave supported and existing filter is NOT "no filtering"
+            // and was requested with different filtering schema
+            log.info("Cannot use existing session for subscription {} ({})",
+                     deviceInfo, filterSchema);
+            openNewSession = true;
+        }
+
+        if (openNewSession) {
+            log.info("Creating notification session to {} with filter {}",
+                     deviceInfo, filterSchema);
+            NetconfSession child = new NotificationSession(deviceInfo);
+
+            child.addDeviceOutputListener(new NotificationForwarder());
+
+            child.startSubscription(filterSchema);
+            children.add(child);
+            return;
+        }
+
+        // request to start interleaved notification session
+        String reply = sendRequest(createSubscriptionString(filterSchema));
+        if (!checkReply(reply)) {
+            throw new NetconfException("Subscription not successful with device "
+                                               + deviceInfo + " with reply " + reply);
+        }
+        subscriptionConnected = true;
+    }
+
+    @Override
+    public void startSubscription() throws NetconfException {
+        if (!subscriptionConnected) {
+            startSubscriptionStream(null);
+        }
+        streamHandler.setEnableNotifications(true);
+    }
+
+    @Beta
+    @Override
+    public void startSubscription(String filterSchema) throws NetconfException {
+        if (!subscriptionConnected) {
+            notificationFilterSchema = filterSchema;
+            startSubscriptionStream(filterSchema);
+        }
+        streamHandler.setEnableNotifications(true);
+    }
+
+    @Beta
+    protected String createSubscriptionString(String filterSchema) {
+        StringBuilder subscriptionbuffer = new StringBuilder();
+        subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        subscriptionbuffer.append("  <create-subscription\n");
+        subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
+        // FIXME Only subtree filtering supported at the moment.
+        if (filterSchema != null) {
+            subscriptionbuffer.append("    ");
+            subscriptionbuffer.append(SUBSCRIPTION_SUBTREE_FILTER_OPEN).append(NEW_LINE);
+            subscriptionbuffer.append(filterSchema).append(NEW_LINE);
+            subscriptionbuffer.append("    ");
+            subscriptionbuffer.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE);
+        }
+        subscriptionbuffer.append("  </create-subscription>\n");
+        subscriptionbuffer.append("</rpc>\n");
+        subscriptionbuffer.append(ENDPATTERN);
+        return subscriptionbuffer.toString();
+    }
+
+    @Override
+    public void endSubscription() throws NetconfException {
+        if (subscriptionConnected) {
+            streamHandler.setEnableNotifications(false);
+        } else {
+            throw new NetconfException("Subscription does not exist.");
+        }
+    }
+
+    private void sendHello() throws NetconfException {
+        serverHelloResponseOld = sendRequest(createHelloString(), true);
+        Matcher capabilityMatcher = CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponseOld);
+        while (capabilityMatcher.find()) {
+            deviceCapabilities.add(capabilityMatcher.group(1));
+        }
+        sessionID = String.valueOf(-1);
+        Matcher sessionIDMatcher = SESSION_ID_REGEX_PATTERN.matcher(serverHelloResponseOld);
+        if (sessionIDMatcher.find()) {
+            sessionID = sessionIDMatcher.group(1);
+        } else {
+            throw new NetconfException("Missing SessionID in server hello " +
+                                               "reponse.");
+        }
+
+    }
+
+    private String createHelloString() {
+        StringBuilder hellobuffer = new StringBuilder();
+        hellobuffer.append(XML_HEADER);
+        hellobuffer.append("\n");
+        hellobuffer.append("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        hellobuffer.append("  <capabilities>\n");
+        onosCapabilities.forEach(
+                cap -> hellobuffer.append("    <capability>")
+                        .append(cap)
+                        .append("</capability>\n"));
+        hellobuffer.append("  </capabilities>\n");
+        hellobuffer.append("</hello>\n");
+        hellobuffer.append(ENDPATTERN);
+        return hellobuffer.toString();
+
+    }
+
+    @Override
+    public void checkAndReestablish() throws NetconfException {
+        try {
+            if (client.isClosed()) {
+                log.debug("Trying to restart the whole SSH connection with {}", deviceInfo.getDeviceId());
+                cleanUp();
+                startConnection();
+            } else if (session.isClosed()) {
+                log.debug("Trying to restart the session with {}", session, deviceInfo.getDeviceId());
+                cleanUp();
+                startSession();
+            } else if (channel.isClosed()) {
+                log.debug("Trying to reopen the channel with {}", deviceInfo.getDeviceId());
+                cleanUp();
+                openChannel();
+            }
+            if (subscriptionConnected) {
+                log.debug("Restarting subscription with {}", deviceInfo.getDeviceId());
+                subscriptionConnected = false;
+                startSubscription(notificationFilterSchema);
+            }
+        } catch (IOException e) {
+            log.error("Can't reopen connection for device {}", e.getMessage());
+            throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
+        }
+    }
+
+    private void cleanUp() {
+        //makes sure everything is at a clean state.
+        replies.clear();
+    }
+
+    @Override
+    public String requestSync(String request) throws NetconfException {
+        if (!request.contains(ENDPATTERN)) {
+            request = request + NEW_LINE + ENDPATTERN;
+        }
+        String reply = sendRequest(request);
+        checkReply(reply);
+        return reply;
+    }
+
+    @Override
+    @Deprecated
+    public CompletableFuture<String> request(String request) {
+        return streamHandler.sendMessage(request);
+    }
+
+    private CompletableFuture<String> request(String request, int messageId) {
+        return streamHandler.sendMessage(request, messageId);
+    }
+
+    private String sendRequest(String request) throws NetconfException {
+        return sendRequest(request, false);
+    }
+
+    private String sendRequest(String request, boolean isHello) throws NetconfException {
+        checkAndReestablish();
+        int messageId = -1;
+        if (!isHello) {
+            messageId = messageIdInteger.getAndIncrement();
+        }
+        request = formatRequestMessageId(request, messageId);
+        request = formatXmlHeader(request);
+        CompletableFuture<String> futureReply = request(request, messageId);
+        int replyTimeout = NetconfControllerImpl.netconfReplyTimeout;
+        String rp;
+        try {
+            rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
+            replies.remove(messageId);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new NetconfException("No matching reply for request " + request, e);
+        }
+        log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
+        return rp.trim();
+    }
+
+    private String formatRequestMessageId(String request, int messageId) {
+        if (request.contains(MESSAGE_ID_STRING)) {
+            //FIXME if application provides his own counting of messages this fails that count
+            request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
+                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
+        } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
+            //FIXME find out a better way to enforce the presence of message-id
+            request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
+                    + messageId + "\"" + ">");
+        }
+        return request;
+    }
+
+    private String formatXmlHeader(String request) {
+        if (!request.contains(XML_HEADER)) {
+            //FIXME if application provieds his own XML header of different type there is a clash
+            request = XML_HEADER + "\n" + request;
+        }
+        return request;
+    }
+
+    @Override
+    public String doWrappedRpc(String request) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append(request);
+        rpc.append(RPC_CLOSE).append(NEW_LINE);
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
+        checkReply(reply);
+        return reply;
+    }
+
+    @Override
+    public String get(String request) throws NetconfException {
+        return requestSync(request);
+    }
+
+    @Override
+    public String get(String filterSchema, String withDefaultsMode) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append(GET_OPEN).append(NEW_LINE);
+        if (filterSchema != null) {
+            rpc.append(SUBTREE_FILTER_OPEN).append(NEW_LINE);
+            rpc.append(filterSchema).append(NEW_LINE);
+            rpc.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE);
+        }
+        if (withDefaultsMode != null) {
+            rpc.append(WITH_DEFAULT_OPEN).append(NETCONF_WITH_DEFAULTS_NAMESPACE).append(">");
+            rpc.append(withDefaultsMode).append(WITH_DEFAULT_CLOSE).append(NEW_LINE);
+        }
+        rpc.append(GET_CLOSE).append(NEW_LINE);
+        rpc.append(RPC_CLOSE).append(NEW_LINE);
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
+        checkReply(reply);
+        return reply;
+    }
+
+    @Override
+    public String getConfig(TargetConfig netconfTargetConfig) throws NetconfException {
+        return getConfig(netconfTargetConfig, null);
+    }
+
+    @Override
+    public String getConfig(String netconfTargetConfig) throws NetconfException {
+        return getConfig(TargetConfig.toTargetConfig(netconfTargetConfig));
+    }
+
+    @Override
+    public String getConfig(String netconfTargetConfig, String configurationFilterSchema) throws NetconfException {
+        return getConfig(TargetConfig.toTargetConfig(netconfTargetConfig), configurationFilterSchema);
+    }
+
+    @Override
+    public String getConfig(TargetConfig netconfTargetConfig, String configurationSchema) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc ");
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        rpc.append("<get-config>\n");
+        rpc.append("<source>\n");
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append("</source>");
+        if (configurationSchema != null) {
+            rpc.append("<filter type=\"subtree\">\n");
+            rpc.append(configurationSchema).append("\n");
+            rpc.append("</filter>\n");
+        }
+        rpc.append("</get-config>\n");
+        rpc.append("</rpc>\n");
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
+        return checkReply(reply) ? reply : "ERROR " + reply;
+    }
+
+    @Override
+    public boolean editConfig(String newConfiguration) throws NetconfException {
+        newConfiguration = newConfiguration + ENDPATTERN;
+        return checkReply(sendRequest(newConfiguration));
+    }
+
+    @Override
+    public boolean editConfig(String netconfTargetConfig, String mode, String newConfiguration)
+            throws NetconfException {
+        return editConfig(TargetConfig.toTargetConfig(netconfTargetConfig), mode, newConfiguration);
+    }
+
+    @Override
+    public boolean editConfig(TargetConfig netconfTargetConfig, String mode, String newConfiguration)
+            throws NetconfException {
+        newConfiguration = newConfiguration.trim();
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append(EDIT_CONFIG_OPEN).append("\n");
+        rpc.append(TARGET_OPEN);
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append(TARGET_CLOSE).append("\n");
+        if (mode != null) {
+            rpc.append(DEFAULT_OPERATION_OPEN);
+            rpc.append(mode);
+            rpc.append(DEFAULT_OPERATION_CLOSE).append("\n");
+        }
+        rpc.append(CONFIG_OPEN).append("\n");
+        rpc.append(newConfiguration);
+        rpc.append(CONFIG_CLOSE).append("\n");
+        rpc.append(EDIT_CONFIG_CLOSE).append("\n");
+        rpc.append(RPC_CLOSE);
+        rpc.append(ENDPATTERN);
+        log.debug(rpc.toString());
+        String reply = sendRequest(rpc.toString());
+        return checkReply(reply);
+    }
+
+    @Override
+    public boolean copyConfig(String netconfTargetConfig, String newConfiguration) throws NetconfException {
+        return copyConfig(TargetConfig.toTargetConfig(netconfTargetConfig), newConfiguration);
+    }
+
+    @Override
+    public boolean copyConfig(TargetConfig netconfTargetConfig, String newConfiguration)
+            throws NetconfException {
+        newConfiguration = newConfiguration.trim();
+        if (!newConfiguration.startsWith("<config>")) {
+            newConfiguration = "<config>" + newConfiguration
+                    + "</config>";
+        }
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append(RPC_OPEN);
+        rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
+        rpc.append("<copy-config>");
+        rpc.append("<target>");
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append("</target>");
+        rpc.append("<source>");
+        rpc.append(newConfiguration);
+        rpc.append("</source>");
+        rpc.append("</copy-config>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
+    }
+
+    @Override
+    public boolean deleteConfig(String netconfTargetConfig) throws NetconfException {
+        return deleteConfig(TargetConfig.toTargetConfig(netconfTargetConfig));
+    }
+
+    @Override
+    public boolean deleteConfig(TargetConfig netconfTargetConfig) throws NetconfException {
+        if (netconfTargetConfig.equals(TargetConfig.RUNNING)) {
+            log.warn("Target configuration for delete operation can't be \"running\"",
+                     netconfTargetConfig);
+            return false;
+        }
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc>");
+        rpc.append("<delete-config>");
+        rpc.append("<target>");
+        rpc.append("<").append(netconfTargetConfig).append("/>");
+        rpc.append("</target>");
+        rpc.append("</delete-config>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
+    }
+
+    @Override
+    public boolean lock(String configType) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        rpc.append("<lock>");
+        rpc.append("<target>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
+        rpc.append("</target>");
+        rpc.append("</lock>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        String lockReply = sendRequest(rpc.toString());
+        return checkReply(lockReply);
+    }
+
+    @Override
+    public boolean unlock(String configType) throws NetconfException {
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        rpc.append("<unlock>");
+        rpc.append("<target>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
+        rpc.append("</target>");
+        rpc.append("</unlock>");
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        String unlockReply = sendRequest(rpc.toString());
+        return checkReply(unlockReply);
+    }
+
+    @Override
+    public boolean lock() throws NetconfException {
+        return lock("running");
+    }
+
+    @Override
+    public boolean unlock() throws NetconfException {
+        return unlock("running");
+    }
+
+    @Override
+    public boolean close() throws NetconfException {
+        return close(false);
+    }
+
+    private boolean close(boolean force) throws NetconfException {
+        StringBuilder rpc = new StringBuilder();
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">");
+        if (force) {
+            rpc.append("<kill-session/>");
+        } else {
+            rpc.append("<close-session/>");
+        }
+        rpc.append("</rpc>");
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString())) || close(true);
+    }
+
+    @Override
+    public String getSessionId() {
+        return sessionID;
+    }
+
+    @Override
+    public Set<String> getDeviceCapabilitiesSet() {
+        return Collections.unmodifiableSet(deviceCapabilities);
+    }
+
+    @Deprecated
+    @Override
+    public String getServerCapabilities() {
+        return serverHelloResponseOld;
+    }
+
+    @Deprecated
+    @Override
+    public void setDeviceCapabilities(List<String> capabilities) {
+        onosCapabilities = capabilities;
+    }
+
+    @Override
+    public void setOnosCapabilities(Iterable<String> capabilities) {
+        onosCapabilities = capabilities;
+    }
+
+
+    @Override
+    public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        streamHandler.addDeviceEventListener(listener);
+        primaryListeners.add(listener);
+    }
+
+    @Override
+    public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        primaryListeners.remove(listener);
+        streamHandler.removeDeviceEventListener(listener);
+    }
+
+    private boolean checkReply(String reply) throws NetconfException {
+        if (reply != null) {
+            if (!reply.contains("<rpc-error>")) {
+                log.debug("Device {} sent reply {}", deviceInfo, reply);
+                return true;
+            } else if (reply.contains("<ok/>")
+                    || (reply.contains("<rpc-error>")
+                    && reply.contains("warning"))) {
+                log.debug("Device {} sent reply {}", deviceInfo, reply);
+                return true;
+            }
+        }
+        log.warn("Device {} has error in reply {}", deviceInfo, reply);
+        return false;
+    }
+
+    static class NotificationSession extends NetconfSessionMinaImpl {
+
+        private String notificationFilter;
+
+        NotificationSession(NetconfDeviceInfo deviceInfo)
+                throws NetconfException {
+            super(deviceInfo);
+        }
+
+        @Override
+        protected void startSubscriptionStream(String filterSchema)
+                throws NetconfException {
+
+            notificationFilter = filterSchema;
+            requestSync(createSubscriptionString(filterSchema));
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("deviceInfo", deviceInfo)
+                    .add("sessionID", getSessionId())
+                    .add("notificationFilter", notificationFilter)
+                    .toString();
+        }
+    }
+
+    /**
+     * Listener attached to child session for notification streaming.
+     * <p>
+     * Forwards all notification event from child session to primary session
+     * listeners.
+     */
+    private final class NotificationForwarder
+            implements NetconfDeviceOutputEventListener {
+
+        @Override
+        public boolean isRelevant(NetconfDeviceOutputEvent event) {
+            return event.type() == Type.DEVICE_NOTIFICATION;
+        }
+
+        @Override
+        public void event(NetconfDeviceOutputEvent event) {
+            primaryListeners.forEach(lsnr -> {
+                if (lsnr.isRelevant(event)) {
+                    lsnr.event(event);
+                }
+            });
+        }
+    }
+
+    public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
+
+        @Override
+        public void notify(NetconfDeviceOutputEvent event) {
+            Optional<Integer> messageId = event.getMessageID();
+            log.debug("messageID {}, waiting replies messageIDs {}", messageId,
+                      replies.keySet());
+            if (!messageId.isPresent()) {
+                errorReplies.add(event.getMessagePayload());
+                log.error("Device {} sent error reply {}",
+                          event.getDeviceInfo(), event.getMessagePayload());
+                return;
+            }
+            CompletableFuture<String> completedReply =
+                    replies.get(messageId.get());
+            if (completedReply != null) {
+                completedReply.complete(event.getMessagePayload());
+            }
+        }
+    }
+
+    public static class MinaSshNetconfSessionFactory implements NetconfSessionFactory {
+
+        @Override
+        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+            return new NetconfSessionMinaImpl(netconfDeviceInfo);
+        }
+    }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
index 212a4e6..44206e3 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
@@ -203,7 +203,7 @@
                                      netconfDeviceInfo, deviceReply);
                             NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
                                     NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
-                                    null, null, Optional.of(-1), netconfDeviceInfo);
+                                    null, null, Optional.of(-2), netconfDeviceInfo);
                             netconfDeviceEventListeners.forEach(
                                     listener -> listener.event(event));
                             this.interrupt();
@@ -256,7 +256,7 @@
             return Optional.of(messageId);
         }
         if (reply.contains(HELLO)) {
-            return Optional.of(0);
+            return Optional.of(-1);
         }
         return Optional.empty();
     }
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
index 8287e6d..666b0ad 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
@@ -140,12 +140,13 @@
         assertEquals("Incorrect NetConf connect timeout, should be default",
                      5, ctrl.netconfConnectTimeout);
         assertEquals("Incorrect NetConf reply timeout, should be default",
-                5, ctrl.netconfReplyTimeout);
+                     5, ctrl.netconfReplyTimeout);
         ctrl.activate(null);
         assertEquals("Incorrect NetConf connect timeout, should be default",
                      5, ctrl.netconfConnectTimeout);
         assertEquals("Incorrect NetConf reply timeout, should be default",
-                5, ctrl.netconfReplyTimeout);    }
+                     5, ctrl.netconfReplyTimeout);
+    }
 
     /**
      * Test modification of component configuration.
@@ -153,14 +154,15 @@
     @Test
     public void testModified() {
         assertEquals("Incorrect NetConf connect timeout, should be default",
-                    5, ctrl.netconfConnectTimeout);
+                     5, ctrl.netconfConnectTimeout);
         assertEquals("Incorrect NetConf session timeout, should be default",
                      5, ctrl.netconfReplyTimeout);
         ctrl.modified(context);
         assertEquals("Incorrect NetConf connect timeout, should be default",
-                    2, ctrl.netconfConnectTimeout);
+                     2, ctrl.netconfConnectTimeout);
         assertEquals("Incorrect NetConf session timeout",
                      1, ctrl.netconfReplyTimeout);
+        assertEquals("ethz-ssh2", ctrl.sshLibrary);
     }
 
     /**
@@ -398,6 +400,8 @@
                 return "2";
             } else if (key.equals("netconfReplyTimeout")) {
                 return "1";
+            } else if (key.equals("sshLibrary")) {
+                return "ethz-ssh2";
             }
             return null;
         }
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java
index 27223b8..846642c 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionImplTest.java
@@ -24,7 +24,7 @@
 import static org.onosproject.netconf.TargetConfig.RUNNING;
 import static org.onosproject.netconf.TargetConfig.CANDIDATE;
 
-import java.util.ArrayList;
+import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -35,12 +35,10 @@
 import java.util.concurrent.FutureTask;
 import java.util.regex.Pattern;
 
-import org.apache.sshd.SshServer;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.server.Command;
-import org.apache.sshd.server.PasswordAuthenticator;
-import org.apache.sshd.server.UserAuth;
-import org.apache.sshd.server.auth.UserAuthPassword;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.password.PasswordAuthenticator;
 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.apache.sshd.server.session.ServerSession;
 import org.junit.AfterClass;
@@ -59,7 +57,7 @@
 
 /**
  * Unit tests for NetconfSession.
- *
+ * <p>
  * Sets up an SSH Server with Apache SSHD and connects to it using 2 clients
  * Truly verifies that the NETCONF flows are compliant with a NETCONF server.
  */
@@ -77,8 +75,8 @@
 
     private static final String SAMPLE_REQUEST =
             "<some-yang-element xmlns=\"some-namespace\">"
-            + "<some-child-element/>"
-            + "</some-yang-element>";
+                    + "<some-child-element/>"
+                    + "</some-yang-element>";
 
     private static final String EDIT_CONFIG_REQUEST =
             "<?xml version=\"1.0\" encoding=\"UTF-8\"?><rpc message-id=\"6\"  "
@@ -112,9 +110,6 @@
     @BeforeClass
     public static void setUp() throws Exception {
         sshServerNetconf = SshServer.setUpDefaultServer();
-        List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<>();
-        userAuthFactories.add(new UserAuthPassword.Factory());
-        sshServerNetconf.setUserAuthFactories(userAuthFactories);
         sshServerNetconf.setPasswordAuthenticator(
                 new PasswordAuthenticator() {
                     @Override
@@ -126,7 +121,9 @@
                     }
                 });
         sshServerNetconf.setPort(PORT_NUMBER);
-        sshServerNetconf.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(TEST_SERFILE));
+        SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
+        provider.setFile(new File(TEST_SERFILE));
+        sshServerNetconf.setKeyPairProvider(provider);
         sshServerNetconf.setSubsystemFactories(
                 Arrays.<NamedFactory<Command>>asList(new NetconfSshdTestSubsystem.Factory()));
         sshServerNetconf.open();
@@ -224,10 +221,10 @@
         assertNotNull("Incorrect sessionId", session1.getSessionId());
         try {
             assertTrue("NETCONF get-config running command failed. ",
-                    GET_REPLY_PATTERN.matcher(session1.getConfig(RUNNING, SAMPLE_REQUEST)).matches());
+                       GET_REPLY_PATTERN.matcher(session1.getConfig(RUNNING, SAMPLE_REQUEST)).matches());
 
             assertTrue("NETCONF get-config candidate command failed. ",
-                    GET_REPLY_PATTERN.matcher(session1.getConfig(CANDIDATE, SAMPLE_REQUEST)).matches());
+                       GET_REPLY_PATTERN.matcher(session1.getConfig(CANDIDATE, SAMPLE_REQUEST)).matches());
 
         } catch (NetconfException e) {
             e.printStackTrace();
@@ -242,7 +239,7 @@
         assertNotNull("Incorrect sessionId", session1.getSessionId());
         try {
             assertTrue("NETCONF get running command failed. ",
-                    GET_REPLY_PATTERN.matcher(session1.get(SAMPLE_REQUEST, null)).matches());
+                       GET_REPLY_PATTERN.matcher(session1.get(SAMPLE_REQUEST, null)).matches());
 
         } catch (NetconfException e) {
             e.printStackTrace();
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImplTest.java
new file mode 100644
index 0000000..d8c1202
--- /dev/null
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImplTest.java
@@ -0,0 +1,472 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf.ctl.impl;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.password.PasswordAuthenticator;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.junit.TestTools;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfSession;
+import org.onosproject.netconf.TargetConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+import static org.onosproject.netconf.TargetConfig.CANDIDATE;
+import static org.onosproject.netconf.TargetConfig.RUNNING;
+
+/**
+ * Unit tests for NetconfSession.
+ * <p>
+ * Sets up an SSH Server with Apache SSHD and connects to it using 2 clients
+ * Truly verifies that the NETCONF flows are compliant with a NETCONF server.
+ */
+public class NetconfSessionMinaImplTest {
+    private static final Logger log = LoggerFactory
+            .getLogger(NetconfStreamThread.class);
+
+    private static final int PORT_NUMBER = TestTools.findAvailablePort(50830);
+    private static final String TEST_USERNAME = "netconf";
+    private static final String TEST_PASSWORD = "netconf123";
+    private static final String TEST_HOSTNAME = "127.0.0.1";
+
+    private static final String TEST_SERFILE =
+            System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + "testkey.ser";
+
+    private static final String SAMPLE_REQUEST =
+            "<some-yang-element xmlns=\"some-namespace\">"
+                    + "<some-child-element/>"
+                    + "</some-yang-element>";
+
+    private static final String EDIT_CONFIG_REQUEST =
+            "<?xml version=\"1.0\" encoding=\"UTF-8\"?><rpc message-id=\"6\"  "
+                    + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+                    + "<edit-config>\n"
+                    + "<target><running/></target>\n"
+                    + "<config xmlns:nc=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+                    + "<some-yang-element xmlns=\"some-namespace\">"
+                    + "<some-child-element/></some-yang-element></config>\n"
+                    + "</edit-config>\n"
+                    + "</rpc>]]>]]>";
+
+    static final List<String> DEFAULT_CAPABILITIES = ImmutableList.<String>builder()
+            .add("urn:ietf:params:netconf:base:1.0")
+            .add("urn:ietf:params:netconf:base:1.1")
+            .add("urn:ietf:params:netconf:capability:writable-running:1.0")
+            .add("urn:ietf:params:netconf:capability:candidate:1.0")
+            .add("urn:ietf:params:netconf:capability:startup:1.0")
+            .add("urn:ietf:params:netconf:capability:rollback-on-error:1.0")
+            .add("urn:ietf:params:netconf:capability:interleave:1.0")
+            .add("urn:ietf:params:netconf:capability:notification:1.0")
+            .add("urn:ietf:params:netconf:capability:validate:1.0")
+            .add("urn:ietf:params:netconf:capability:validate:1.1")
+            .build();
+
+
+    private static NetconfSession session1;
+    private static NetconfSession session2;
+    private static SshServer sshServerNetconf;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        sshServerNetconf = SshServer.setUpDefaultServer();
+        sshServerNetconf.setPasswordAuthenticator(
+                new PasswordAuthenticator() {
+                    @Override
+                    public boolean authenticate(
+                            String username,
+                            String password,
+                            ServerSession session) {
+                        return TEST_USERNAME.equals(username) && TEST_PASSWORD.equals(password);
+                    }
+                });
+        sshServerNetconf.setPort(PORT_NUMBER);
+        SimpleGeneratorHostKeyProvider provider = new SimpleGeneratorHostKeyProvider();
+        provider.setFile(new File(TEST_SERFILE));
+        sshServerNetconf.setKeyPairProvider(provider);
+        sshServerNetconf.setSubsystemFactories(
+                Arrays.<NamedFactory<Command>>asList(new NetconfSshdTestSubsystem.Factory()));
+        sshServerNetconf.open();
+        log.info("SSH Server opened on port {}", PORT_NUMBER);
+
+        NetconfDeviceInfo deviceInfo = new NetconfDeviceInfo(
+                TEST_USERNAME, TEST_PASSWORD, Ip4Address.valueOf(TEST_HOSTNAME), PORT_NUMBER);
+
+        session1 = new NetconfSessionMinaImpl(deviceInfo);
+        log.info("Started NETCONF Session {} with test SSHD server in Unit Test", session1.getSessionId());
+        assertTrue("Incorrect sessionId", !session1.getSessionId().equalsIgnoreCase("-1"));
+        assertTrue("Incorrect sessionId", !session1.getSessionId().equalsIgnoreCase("0"));
+        assertThat(session1.getDeviceCapabilitiesSet(), containsInAnyOrder(DEFAULT_CAPABILITIES.toArray()));
+
+        session2 = new NetconfSessionMinaImpl(deviceInfo);
+        log.info("Started NETCONF Session {} with test SSHD server in Unit Test", session2.getSessionId());
+        assertTrue("Incorrect sessionId", !session2.getSessionId().equalsIgnoreCase("-1"));
+        assertTrue("Incorrect sessionId", !session2.getSessionId().equalsIgnoreCase("0"));
+        assertThat(session2.getDeviceCapabilitiesSet(), containsInAnyOrder(DEFAULT_CAPABILITIES.toArray()));
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        if (session1 != null) {
+            session1.close();
+        }
+        if (session2 != null) {
+            session2.close();
+        }
+
+        sshServerNetconf.stop();
+    }
+
+    @Test
+    public void testEditConfigRequest() {
+        log.info("Starting edit-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF edit-config command failed",
+                       session1.editConfig(TargetConfig.RUNNING.toString(),
+                                           null, SAMPLE_REQUEST));
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF edit-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing edit-config async");
+    }
+
+    @Test
+    public void testEditConfigRequestWithOnlyNewConfiguration() {
+        log.info("Starting edit-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF edit-config command failed",
+                       session1.editConfig(EDIT_CONFIG_REQUEST));
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF edit-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing edit-config async");
+    }
+
+    @Test
+    public void testDeleteConfigRequestWithRunningTargetConfiguration() {
+        log.info("Starting delete-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertFalse("NETCONF delete-config command failed",
+                        session1.deleteConfig(TargetConfig.RUNNING));
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF delete-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing delete-config async");
+    }
+
+    @Test
+    public void testCopyConfigRequest() {
+        log.info("Starting copy-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF copy-config command failed",
+                       session1.copyConfig(TargetConfig.RUNNING.toString(),
+                                           "candidate"));
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF edit-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing copy-config async");
+    }
+
+    @Test
+    public void testGetConfigRequest() {
+        log.info("Starting get-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF get-config running command failed. ",
+                       GET_REPLY_PATTERN.matcher(session1.getConfig(RUNNING, SAMPLE_REQUEST)).matches());
+
+            assertTrue("NETCONF get-config candidate command failed. ",
+                       GET_REPLY_PATTERN.matcher(session1.getConfig(CANDIDATE, SAMPLE_REQUEST)).matches());
+
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF get-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing get-config async");
+    }
+
+    @Test
+    public void testGetRequest() {
+        log.info("Starting get async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF get running command failed. ",
+                       GET_REPLY_PATTERN.matcher(session1.get(SAMPLE_REQUEST, null)).matches());
+
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF get test failed: " + e.getMessage());
+        }
+        log.info("Finishing get async");
+    }
+
+    @Test
+    public void testLockRequest() {
+        log.info("Starting lock async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF lock request failed", session1.lock());
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF lock test failed: " + e.getMessage());
+        }
+        log.info("Finishing lock async");
+    }
+
+    @Test
+    public void testUnLockRequest() {
+        log.info("Starting unlock async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF unlock request failed", session1.unlock());
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF unlock test failed: " + e.getMessage());
+        }
+        log.info("Finishing unlock async");
+    }
+
+
+    @Test
+    public void testConcurrentSameSessionAccess() throws InterruptedException {
+        NCCopyConfigCallable testCopyConfig1 = new NCCopyConfigCallable(session1, RUNNING, "candidate");
+        NCCopyConfigCallable testCopyConfig2 = new NCCopyConfigCallable(session1, RUNNING, "startup");
+
+        FutureTask<Boolean> futureCopyConfig1 = new FutureTask<>(testCopyConfig1);
+        FutureTask<Boolean> futureCopyConfig2 = new FutureTask<>(testCopyConfig2);
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        log.info("Starting concurrent execution of copy-config through same session");
+        executor.execute(futureCopyConfig1);
+        executor.execute(futureCopyConfig2);
+
+        int count = 0;
+        while (count < 10) {
+            if (futureCopyConfig1.isDone() && futureCopyConfig2.isDone()) {
+                executor.shutdown();
+                log.info("Finished concurrent same session execution");
+                return;
+            }
+            Thread.sleep(100L);
+            count++;
+        }
+        fail("NETCONF test failed to complete.");
+    }
+
+    @Test
+    public void test2SessionAccess() throws InterruptedException {
+        NCCopyConfigCallable testCopySession1 = new NCCopyConfigCallable(session1, RUNNING, "candidate");
+        NCCopyConfigCallable testCopySession2 = new NCCopyConfigCallable(session2, RUNNING, "candidate");
+
+        FutureTask<Boolean> futureCopySession1 = new FutureTask<>(testCopySession1);
+        FutureTask<Boolean> futureCopySession2 = new FutureTask<>(testCopySession2);
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        log.info("Starting concurrent execution of copy-config through 2 different sessions");
+        executor.execute(futureCopySession1);
+        executor.execute(futureCopySession2);
+
+        int count = 0;
+        while (count < 10) {
+            if (futureCopySession1.isDone() && futureCopySession2.isDone()) {
+                executor.shutdown();
+                log.info("Finished concurrent 2 session execution");
+                return;
+            }
+            Thread.sleep(100L);
+            count++;
+        }
+        fail("NETCONF test failed to complete.");
+    }
+
+
+    public static String getTestHelloReply(Optional<Long> sessionId) {
+        return getTestHelloReply(DEFAULT_CAPABILITIES, sessionId);
+    }
+
+    public static String getTestHelloReply(Collection<String> capabilities, Optional<Long> sessionId) {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">");
+        sb.append("<capabilities>");
+        capabilities.forEach(capability -> {
+            sb.append("<capability>").append(capability).append("</capability>");
+        });
+        sb.append("</capabilities>");
+        if (sessionId.isPresent()) {
+            sb.append("<session-id>");
+            sb.append(sessionId.get().toString());
+            sb.append("</session-id>");
+        }
+        sb.append("</hello>");
+
+        return sb.toString();
+    }
+
+    public static String getOkReply(Optional<Integer> messageId) {
+        StringBuffer sb = new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+        sb.append("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ");
+        if (messageId.isPresent()) {
+            sb.append("message-id=\"");
+            sb.append(String.valueOf(messageId.get()));
+            sb.append("\">");
+        }
+        sb.append("<ok/>");
+        sb.append("</rpc-reply>");
+        return sb.toString();
+    }
+
+    public static String getGetReply(Optional<Integer> messageId) {
+        StringBuffer sb = new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+        sb.append("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ");
+        if (messageId.isPresent()) {
+            sb.append("message-id=\"");
+            sb.append(String.valueOf(messageId.get()));
+            sb.append("\">");
+        }
+        sb.append("<data>\n");
+        sb.append(SAMPLE_REQUEST);
+        sb.append("</data>\n");
+        sb.append("</rpc-reply>");
+        return sb.toString();
+    }
+
+    public static final Pattern HELLO_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "( *)(<capabilities>)\\R?"
+                    + "( *)(<capability>urn:ietf:params:netconf:base:1.0</capability>)\\R?"
+                    + "( *)(</capabilities>)\\R?"
+                    + "(</hello>)\\R? *",
+                    Pattern.DOTALL);
+
+    public static final Pattern EDIT_CONFIG_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc message-id=\")[0-9]*(\") *(xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "(<edit-config>)\\R?"
+                    + "(<target>\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|"
+                                    + "(<" + TargetConfig.RUNNING.toString() + "/>)|"
+                                    + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?</target>)\\R?"
+                    + "(<config xmlns:nc=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + ".*"
+                    + "(</config>)\\R?(</edit-config>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+
+    public static final Pattern LOCK_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                                    + "(<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" "
+                                    + "message-id=\")[0-9]*(\">)\\R?"
+                                    + "(<lock>)\\R?"
+                                    + "(<target>\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|"
+                                    + "(<" + TargetConfig.RUNNING.toString() + "/>)|"
+                                    + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?</target>)\\R?"
+                                    + "(</lock>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern UNLOCK_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                                    + "(<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" "
+                                    + "message-id=\")[0-9]*(\">)\\R?"
+                                    + "(<unlock>)\\R?"
+                                    + "(<target>\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|"
+                                    + "(<" + TargetConfig.RUNNING.toString() + "/>)|"
+                                    + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?</target>)\\R?"
+                                    + "(</unlock>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern COPY_CONFIG_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" message-id=\")[0-9]*(\">)\\R?"
+                    + "(<copy-config>)\\R?"
+                    + "(<target>\\R?((<" + TargetConfig.CANDIDATE.toString() + "/>)|"
+                                    + "(<" + TargetConfig.RUNNING.toString() + "/>)|"
+                                    + "(<" + TargetConfig.STARTUP.toString() + "/>))\\R?</target>)\\R?"
+                                    + "(<source>)\\R?(<config>)(("
+                                    + TargetConfig.CANDIDATE.toString() + ")|("
+                                    + TargetConfig.RUNNING.toString() + ")|("
+                                    + TargetConfig.STARTUP.toString()
+                                    + "))(</config>)\\R?(</source>)\\R?"
+                                    + "(</copy-config>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern GET_CONFIG_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc message-id=\")[0-9]*(\"  xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "(<get-config>)\\R?" + "(<source>)\\R?((<"
+                                    + TargetConfig.CANDIDATE.toString()
+                                    + "/>)|(<" + TargetConfig.RUNNING.toString()
+                                    + "/>)|(<" + TargetConfig.STARTUP.toString()
+                                    + "/>))\\R?(</source>)\\R?"
+                    + "(<filter type=\"subtree\">).*(</filter>)\\R?"
+                    + "(</get-config>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern GET_REPLY_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" message-id=\")[0-9]*(\">)\\R?"
+                    + "(<data>).*(</data>)\\R?"
+                    + "(</rpc-reply>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern GET_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc message-id=\")[0-9]*(\"  xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "(<get>)\\R?"
+                    + "(<filter type=\"subtree\">).*(</filter>)\\R?"
+                    + "(</get>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public class NCCopyConfigCallable implements Callable<Boolean> {
+        private NetconfSession session;
+        private TargetConfig target;
+        private String source;
+
+        public NCCopyConfigCallable(NetconfSession session, TargetConfig target, String source) {
+            this.session = session;
+            this.target = target;
+            this.source = source;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            return session.copyConfig(target, source);
+        }
+    }
+}
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java
index af1134c..cc1cbf9 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfSshdTestSubsystem.java
@@ -22,14 +22,15 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.common.util.Buffer;
-import org.apache.sshd.common.util.ThreadUtils;
+import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.Command;
 import org.apache.sshd.server.Environment;
 import org.apache.sshd.server.ExitCallback;
@@ -188,11 +189,12 @@
                         deviceRequest = deviceRequest.replace(END_PATTERN, "");
                         Optional<Integer> messageId = NetconfStreamThread.getMsgId(deviceRequest);
                         log.info("Client Request on session {}. MsgId {}: {}",
-                                session.getId(), messageId, deviceRequest);
+                                session.getSessionId(), messageId, deviceRequest);
                         synchronized (outputStream) {
                             if (NetconfSessionImplTest.HELLO_REQ_PATTERN.matcher(deviceRequest).matches()) {
                                 String helloReply =
-                                        NetconfSessionImplTest.getTestHelloReply(Optional.of(session.getId()));
+                                        NetconfSessionImplTest.getTestHelloReply(Optional.of(ByteBuffer.wrap(
+                                                session.getSessionId()).asLongBuffer().get()));
                                 outputStream.write(helloReply + END_PATTERN);
                                 outputStream.flush();
                             } else if (NetconfSessionImplTest.EDIT_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
@@ -211,7 +213,8 @@
                                 outputStream.flush();
                             } else {
                                 log.error("Unexpected NETCONF message structure on session {} : {}",
-                                        session.getId(), deviceRequest);
+                                          ByteBuffer.wrap(
+                                                  session.getSessionId()).asLongBuffer().get(), deviceRequest);
                             }
                         }
                         deviceRequestBuilder.setLength(0);