Fixed some concurrency issues in NETCONF Session Added Unit Tests

Change-Id: I84fe0c17e3d757948a859f78d01fbb025397a44d
diff --git a/lib/BUCK b/lib/BUCK
index 69e6afd..64f1dff 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1220,3 +1220,12 @@
   visibility = [ 'PUBLIC' ],
 )
 
+remote_jar (
+  name = 'sshd-core',
+  out = 'sshd-core-0.14.0.jar',
+  url = 'mvn:org.apache.sshd:sshd-core:jar:0.14.0',
+  sha1 = 'cb12fa1b1b07fb5ce3aa4f99b189743897bd4fca',
+  maven_coords = 'org.apache.sshd:sshd-core:0.14.0',
+  visibility = [ 'PUBLIC' ],
+)
+
diff --git a/lib/deps.json b/lib/deps.json
index f809e3f..d6312a5 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -214,6 +214,7 @@
       "uri": "mvn:org.projectfloodlight:openflowj:3.0.0-SNAPSHOT",
       "repo": "https://oss.sonatype.org/content/repositories/snapshots"
     },
-    "plexus-utils": "mvn:org.codehaus.plexus:plexus-utils:3.0.24"
+    "plexus-utils": "mvn:org.codehaus.plexus:plexus-utils:3.0.24",
+    "sshd-core": "mvn:org.apache.sshd:sshd-core:0.14.0"
   }
 }
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index 1d83fd6..172d53c 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -24,22 +24,27 @@
  * NETCONF session object that allows NETCONF operations on top with the physical
  * device on top of an SSH connection.
  */
-// TODO change return type of methdos to <Capability, XMLdoc, string or yang obj>
+// TODO change return type of methods to <Capability, XMLdoc, string or yang obj>
 public interface NetconfSession {
 
     /**
      * Executes an asynchronous RPC to the server and obtains a future to be completed.
      *
+     * The caller must ensure that the message-id in any request is unique
+     * for the session
+     *
+     * @deprecated  - 1.10.0 do not remove needs reworking
      * @param request the XML containing the RPC for the server.
      * @return Server response or ERROR
      * @throws NetconfException when there is a problem in the communication process on
      * the underlying connection
      */
+    @Deprecated
     CompletableFuture<String> request(String request) throws NetconfException;
 
 
     /**
-     * Retrives the requested configuration, different from get-config.
+     * Retrieves the requested configuration, different from get-config.
      *
      * @param request the XML containing the request to the server.
      * @return device running configuration
@@ -49,7 +54,7 @@
     String get(String request) throws NetconfException;
 
     /**
-     * Retrives the requested data.
+     * Retrieves the requested data.
      *
      * @param filterSchema XML subtrees to include in the reply
      * @param withDefaultsMode with-defaults mode
diff --git a/protocols/netconf/ctl/BUCK b/protocols/netconf/ctl/BUCK
index 1503f32..8271185 100644
--- a/protocols/netconf/ctl/BUCK
+++ b/protocols/netconf/ctl/BUCK
@@ -8,6 +8,7 @@
     '//lib:TEST_ADAPTERS',
     '//utils/osgi:onlab-osgi-tests',
     '//core/api:onos-api-tests',
+    '//lib:sshd-core'
 ]
 
 osgi_jar_with_tests (
diff --git a/protocols/netconf/ctl/pom.xml b/protocols/netconf/ctl/pom.xml
index d1517a1..666c933 100644
--- a/protocols/netconf/ctl/pom.xml
+++ b/protocols/netconf/ctl/pom.xml
@@ -56,6 +56,12 @@
             <classifier>tests</classifier>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-core</artifactId>
+            <version>0.14.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 26ded2f..07f7c98 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -31,15 +31,17 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
 
 
 /**
@@ -82,6 +84,8 @@
     private static final String SUBSCRIPTION_SUBTREE_FILTER_OPEN =
             "<filter xmlns:base10=\"urn:ietf:params:xml:ns:netconf:base:1.0\" base10:type=\"subtree\">";
 
+    private static Pattern msgIdPattern = Pattern.compile("(message-id=\"[0-9]+\")");
+
     private final AtomicInteger messageIdInteger = new AtomicInteger(0);
     private Connection netconfConnection;
     private NetconfDeviceInfo deviceInfo;
@@ -101,7 +105,7 @@
         this.netconfConnection = null;
         this.sshSession = null;
         connectionActive = false;
-        replies = new HashMap<>();
+        replies = new ConcurrentHashMap<>();
         errorReplies = new ArrayList<>();
         startConnection();
     }
@@ -158,7 +162,8 @@
             sshSession.startSubSystem("netconf");
             streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
                                                     sshSession.getStderr(), deviceInfo,
-                                                    new NetconfSessionDelegateImpl());
+                                                    new NetconfSessionDelegateImpl(),
+                                                    replies);
             this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
             sendHello();
         } catch (IOException e) {
@@ -276,22 +281,26 @@
     }
 
     @Override
+    @Deprecated
     public CompletableFuture<String> request(String request) {
-        CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
-        replies.put(messageIdInteger.get(), ftrep);
-        return ftrep;
+        return streamHandler.sendMessage(request);
+    }
+
+    private CompletableFuture<String> request(String request, int messageId) {
+        return streamHandler.sendMessage(request, messageId);
     }
 
     private String sendRequest(String request) throws NetconfException {
         checkAndRestablishSession();
-        request = formatRequestMessageId(request);
+        final int messageId = messageIdInteger.getAndIncrement();
+        request = formatRequestMessageId(request, messageId);
         request = formatXmlHeader(request);
-        CompletableFuture<String> futureReply = request(request);
-        messageIdInteger.incrementAndGet();
+        CompletableFuture<String> futureReply = request(request, messageId);
         int replyTimeout = NetconfControllerImpl.netconfReplyTimeout;
         String rp;
         try {
             rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
+            replies.remove(messageId);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             throw new NetconfException("No matching reply for request " + request, e);
         }
@@ -299,15 +308,15 @@
         return rp.trim();
     }
 
-    private String formatRequestMessageId(String request) {
+    private String formatRequestMessageId(String request, int messageId) {
         if (request.contains(MESSAGE_ID_STRING)) {
-            //FIXME if application provieds his own counting of messages this fails that count
+            //FIXME if application provides his own counting of messages this fails that count
             request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
-                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageIdInteger.get() + "\"");
+                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
         } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
             //FIXME find out a better way to enforce the presence of message-id
             request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
-                    + messageIdInteger.get() + "\"" + ">");
+                    + messageId + "\"" + ">");
         }
         return request;
     }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
index 1ee7911..98ebf3f 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -29,12 +29,25 @@
     /**
      * Sends the request on the stream that is used to communicate to and from the device.
      *
+     * If this request does not contain a messageId then this will throw a NoSuchElementException
+     *
      * @param request request to send to the physical device
      * @return a CompletableFuture of type String that will contain the response for the request.
+     * @deprecated - use method with messageId parameter instead
      */
+    @Deprecated
     CompletableFuture<String> sendMessage(String request);
 
     /**
+     * Sends the request on the stream that is used to communicate to and from the device.
+     *
+     * @param request request to send to the physical device
+     * @param messageId The identifier of the message - should be unique for the session
+     * @return a CompletableFuture of type String that will contain the response for the request.
+     */
+    CompletableFuture<String> sendMessage(String request, int messageId);
+
+    /**
      * Adds a listener for netconf events on the handled stream.
      *
      * @param listener Netconf device event listener
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 184a074..4d713f6 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -32,6 +32,7 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Matcher;
@@ -63,26 +64,41 @@
     private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
             = Lists.newCopyOnWriteArrayList();
     private boolean enableNotifications = true;
+    private Map<Integer, CompletableFuture<String>> replies;
 
     public NetconfStreamThread(final InputStream in, final OutputStream out,
                                final InputStream err, NetconfDeviceInfo deviceInfo,
-                               NetconfSessionDelegate delegate) {
+                               NetconfSessionDelegate delegate,
+                               Map<Integer, CompletableFuture<String>> replies) {
         this.in = in;
         this.err = err;
         outputStream = new PrintWriter(out);
         netconfDeviceInfo = deviceInfo;
         state = NetconfMessageState.NO_MATCHING_PATTERN;
         sessionDelegate = delegate;
+        this.replies = replies;
         log.debug("Stream thread for device {} session started", deviceInfo);
         start();
     }
 
     @Override
     public CompletableFuture<String> sendMessage(String request) {
+        Optional<Integer> messageId = getMsgId(request);
+        return sendMessage(request, messageId.get());
+    }
+
+    @Override
+    public CompletableFuture<String> sendMessage(String request, int messageId) {
         log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
-        outputStream.print(request);
-        outputStream.flush();
-        return new CompletableFuture<>();
+        CompletableFuture<String> cf = new CompletableFuture<>();
+        replies.put(messageId, cf);
+
+        synchronized (outputStream) {
+            outputStream.print(request);
+            outputStream.flush();
+        }
+
+        return cf;
     }
 
     public enum NetconfMessageState {
@@ -230,7 +246,7 @@
             }
     }
 
-    private static Optional<Integer> getMsgId(String reply) {
+    protected static Optional<Integer> getMsgId(String reply) {
         Matcher matcher = MSGID_PATTERN.matcher(reply);
         if (matcher.find()) {
             Integer messageId = Integer.parseInt(matcher.group(1));
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSessionImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSessionImplTest.java
new file mode 100644
index 0000000..661c290
--- /dev/null
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSessionImplTest.java
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf.ctl;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.regex.Pattern;
+
+import org.apache.sshd.SshServer;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.PasswordAuthenticator;
+import org.apache.sshd.server.UserAuth;
+import org.apache.sshd.server.auth.UserAuthPassword;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.junit.TestTools;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for NetconfSession.
+ *
+ * Sets up an SSH Server with Apache SSHD and connects to it using 2 clients
+ * Truly verifies that the NETCONF flows are compliant with a NETCONF server.
+ */
+public class NetconfSessionImplTest {
+    private static final Logger log = LoggerFactory
+            .getLogger(NetconfStreamThread.class);
+
+    private static final int PORT_NUMBER = TestTools.findAvailablePort(50830);
+    private static final String TEST_USERNAME = "netconf";
+    private static final String TEST_PASSWORD = "netconf123";
+    private static final String TEST_HOSTNAME = "127.0.0.1";
+    private static final String TEST_SERFILE =
+            System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + "testkey.ser";
+
+    private static final String SAMPLE_REQUEST =
+            "<some-yang-element xmlns=\"some-namespace\">"
+            + "<some-child-element/>"
+            + "</some-yang-element>";
+
+    private static NetconfSession session1;
+    private static NetconfSession session2;
+    private static SshServer sshServerNetconf;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        sshServerNetconf = SshServer.setUpDefaultServer();
+        List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<NamedFactory<UserAuth>>();
+        userAuthFactories.add(new UserAuthPassword.Factory());
+        sshServerNetconf.setUserAuthFactories(userAuthFactories);
+        sshServerNetconf.setPasswordAuthenticator(
+                new PasswordAuthenticator() {
+                    @Override
+                    public boolean authenticate(
+                            String username,
+                            String password,
+                            ServerSession session) {
+                        return TEST_USERNAME.equals(username) && TEST_PASSWORD.equals(password);
+                    }
+                });
+        sshServerNetconf.setPort(PORT_NUMBER);
+        sshServerNetconf.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(TEST_SERFILE));
+        sshServerNetconf.setSubsystemFactories(
+                Arrays.<NamedFactory<Command>>asList(new NetconfSshdTestSubsystem.Factory()));
+        sshServerNetconf.open();
+        log.info("SSH Server opened on port {}", PORT_NUMBER);
+
+        NetconfDeviceInfo deviceInfo = new NetconfDeviceInfo(
+                TEST_USERNAME, TEST_PASSWORD, Ip4Address.valueOf(TEST_HOSTNAME), PORT_NUMBER);
+
+        session1 = new NetconfSessionImpl(deviceInfo);
+        log.info("Started NETCONF Session {} with test SSHD server in Unit Test", session1.getSessionId());
+        assertTrue("Incorrect sessionId", !session1.getSessionId().equalsIgnoreCase("-1"));
+        assertTrue("Incorrect sessionId", !session1.getSessionId().equalsIgnoreCase("0"));
+
+        session2 = new NetconfSessionImpl(deviceInfo);
+        log.info("Started NETCONF Session {} with test SSHD server in Unit Test", session2.getSessionId());
+        assertTrue("Incorrect sessionId", !session2.getSessionId().equalsIgnoreCase("-1"));
+        assertTrue("Incorrect sessionId", !session2.getSessionId().equalsIgnoreCase("0"));
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        if (session1 != null) {
+            session1.close();
+        }
+        if (session2 != null) {
+            session2.close();
+        }
+
+        sshServerNetconf.stop();
+    }
+
+    @Test
+    public void testEditConfigRequest() {
+        log.info("Starting edit-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF edit-config command failed", session1.editConfig("running", null, SAMPLE_REQUEST));
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF edit-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing edit-config async");
+    }
+
+    @Test
+    public void testCopyConfigRequest() {
+        log.info("Starting copy-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF edit-config command failed", session1.copyConfig("running", "candidate"));
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF edit-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing copy-config async");
+    }
+
+    @Test
+    public void testGetConfigRequest() {
+        log.info("Starting get-config async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF get-config running command failed. ",
+                    GET_REPLY_PATTERN.matcher(session1.getConfig("running", SAMPLE_REQUEST)).matches());
+
+            assertTrue("NETCONF get-config candidate command failed. ",
+                    GET_REPLY_PATTERN.matcher(session1.getConfig("candidate", SAMPLE_REQUEST)).matches());
+
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF get-config test failed: " + e.getMessage());
+        }
+        log.info("Finishing get-config async");
+    }
+
+    @Test
+    public void testGetRequest() {
+        log.info("Starting get async");
+        assertNotNull("Incorrect sessionId", session1.getSessionId());
+        try {
+            assertTrue("NETCONF get running command failed. ",
+                    GET_REPLY_PATTERN.matcher(session1.get(SAMPLE_REQUEST, null)).matches());
+
+        } catch (NetconfException e) {
+            e.printStackTrace();
+            fail("NETCONF get test failed: " + e.getMessage());
+        }
+        log.info("Finishing get async");
+    }
+
+    @Test
+    public void testConcurrentSameSessionAccess() throws InterruptedException {
+        NCCopyConfigCallable testCopyConfig1 = new NCCopyConfigCallable(session1, "running", "candidate");
+        NCCopyConfigCallable testCopyConfig2 = new NCCopyConfigCallable(session1, "candidate", "startup");
+
+        FutureTask<Boolean> futureCopyConfig1 = new FutureTask<Boolean>(testCopyConfig1);
+        FutureTask<Boolean> futureCopyConfig2 = new FutureTask<Boolean>(testCopyConfig2);
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        log.info("Starting concurrent execution of copy-config through same session");
+        executor.execute(futureCopyConfig1);
+        executor.execute(futureCopyConfig2);
+
+        int count = 0;
+        while (count < 10) {
+            if (futureCopyConfig1.isDone() && futureCopyConfig2.isDone()) {
+                executor.shutdown();
+                log.info("Finished concurrent same session execution");
+                return;
+            }
+            Thread.sleep(100L);
+            count++;
+        }
+        fail("NETCONF test failed to complete.");
+    }
+
+    @Test
+    public void test2SessionAccess() throws InterruptedException {
+        NCCopyConfigCallable testCopySession1 = new NCCopyConfigCallable(session1, "running", "candidate");
+        NCCopyConfigCallable testCopySession2 = new NCCopyConfigCallable(session2, "running", "candidate");
+
+        FutureTask<Boolean> futureCopySession1 = new FutureTask<Boolean>(testCopySession1);
+        FutureTask<Boolean> futureCopySession2 = new FutureTask<Boolean>(testCopySession2);
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        log.info("Starting concurrent execution of copy-config through 2 different sessions");
+        executor.execute(futureCopySession1);
+        executor.execute(futureCopySession2);
+
+        int count = 0;
+        while (count < 10) {
+            if (futureCopySession1.isDone() && futureCopySession2.isDone()) {
+                executor.shutdown();
+                log.info("Finished concurrent 2 session execution");
+                return;
+            }
+            Thread.sleep(100L);
+            count++;
+        }
+        fail("NETCONF test failed to complete.");
+    }
+
+
+    public static String getTestHelloReply(Optional<Long> sessionId) {
+        StringBuffer sb = new StringBuffer();
+
+        sb.append("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">");
+        sb.append("<capabilities>");
+        sb.append("<capability>urn:ietf:params:netconf:base:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:base:1.1</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:writable-running:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:candidate:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:startup:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:rollback-on-error:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:interleave:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:notification:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:validate:1.0</capability>");
+        sb.append("<capability>urn:ietf:params:netconf:capability:validate:1.1</capability>");
+        sb.append("</capabilities>");
+        if (sessionId.isPresent()) {
+            sb.append("<session-id>");
+            sb.append(sessionId.get().toString());
+            sb.append("</session-id>");
+        }
+        sb.append("</hello>");
+
+        return sb.toString();
+    }
+
+    public static String getOkReply(Optional<Integer> messageId) {
+        StringBuffer sb = new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+        sb.append("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ");
+        if (messageId.isPresent()) {
+            sb.append("message-id=\"");
+            sb.append(String.valueOf(messageId.get()));
+            sb.append("\">");
+        }
+        sb.append("<ok/>");
+        sb.append("</rpc-reply>");
+        return sb.toString();
+    }
+
+    public static String getGetReply(Optional<Integer> messageId) {
+        StringBuffer sb = new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+        sb.append("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ");
+        if (messageId.isPresent()) {
+            sb.append("message-id=\"");
+            sb.append(String.valueOf(messageId.get()));
+            sb.append("\">");
+        }
+        sb.append("<data>\n");
+        sb.append(SAMPLE_REQUEST);
+        sb.append("</data>\n");
+        sb.append("</rpc-reply>");
+        return sb.toString();
+    }
+
+    public static final Pattern HELLO_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "( *)(<capabilities>)\\R?"
+                    + "( *)(<capability>urn:ietf:params:netconf:base:1.0</capability>)\\R?"
+                    + "( *)(</capabilities>)\\R?"
+                    + "(</hello>)\\R? *",
+                    Pattern.DOTALL);
+
+    public static final Pattern EDIT_CONFIG_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc message-id=\")[0-9]*(\") *(xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "(<edit-config>)\\R?"
+                    + "(<target>\\R?((<candidate/>)|(<running/>)|(<startup/>))\\R?</target>)\\R?"
+                    + "(<config xmlns:nc=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + ".*"
+                    + "(</config>)\\R?(</edit-config>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern COPY_CONFIG_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" message-id=\")[0-9]*(\">)\\R?"
+                    + "(<copy-config>)\\R?"
+                    + "(<target>\\R?((<candidate/>)|(<running/>)|(<startup/>))\\R?</target>)\\R?"
+                    + "(<source>)\\R?(<config>)((candidate)|(running)|(startup))(</config>)\\R?(</source>)\\R?"
+                    + "(</copy-config>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern GET_CONFIG_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc message-id=\")[0-9]*(\"  xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "(<get-config>)\\R?"
+                    + "(<source>)\\R?((<candidate/>)|(<running/>)|(<startup/>))\\R?(</source>)\\R?"
+                    + "(<filter type=\"subtree\">).*(</filter>)\\R?"
+                    + "(</get-config>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern GET_REPLY_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\" message-id=\")[0-9]*(\">)\\R?"
+                    + "(<data>).*(</data>)\\R?"
+                    + "(</rpc-reply>)\\R?", Pattern.DOTALL);
+
+    public static final Pattern GET_REQ_PATTERN =
+            Pattern.compile("(<\\?xml).*"
+                    + "(<rpc message-id=\")[0-9]*(\"  xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">)\\R?"
+                    + "(<get>)\\R?"
+                    + "(<filter type=\"subtree\">).*(</filter>)\\R?"
+                    + "(</get>)\\R?(</rpc>)\\R?", Pattern.DOTALL);
+
+    public class NCCopyConfigCallable implements Callable<Boolean> {
+        private NetconfSession session;
+        private String target;
+        private String source;
+
+        public NCCopyConfigCallable(NetconfSession session, String target, String source) {
+            this.session = session;
+            this.target = target;
+            this.source = source;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            return session.copyConfig(target, source);
+        }
+    }
+}
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSshdTestSubsystem.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSshdTestSubsystem.java
new file mode 100644
index 0000000..b2f53f5
--- /dev/null
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSshdTestSubsystem.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf.ctl;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.ThreadUtils;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.session.ServerSession;
+import org.onosproject.netconf.ctl.NetconfStreamThread.NetconfMessageState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocks a NETCONF Device to test the NETCONF Southbound Interface etc.
+ *
+ * Implements the 'netconf' subsystem on Apache SSH (Mina).
+ * See SftpSubsystem for an example of another subsystem
+ */
+public class NetconfSshdTestSubsystem extends Thread implements Command, Runnable, SessionAware {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    public static class Factory implements NamedFactory<Command> {
+
+        public static final String NAME = "netconf";
+
+        private final ExecutorService   executors;
+        private final boolean shutdownExecutor;
+
+        public Factory() {
+            this(null);
+        }
+
+        /**
+         * @param executorService The {@link ExecutorService} to be used by
+         *                        the {@link SftpSubsystem} command when starting execution. If
+         *                        {@code null} then a single-threaded ad-hoc service is used.
+         *                        <B>Note:</B> the service will <U>not</U> be shutdown when the
+         *                        subsystem is closed - unless it is the ad-hoc service, which will be
+         *                        shutdown regardless
+         * @see Factory(ExecutorService, boolean)}
+         */
+        public Factory(ExecutorService executorService) {
+            this(executorService, false);
+        }
+
+        /**
+         * @param executorService The {@link ExecutorService} to be used by
+         *                        the {@link SftpSubsystem} command when starting execution. If
+         *                        {@code null} then a single-threaded ad-hoc service is used.
+         * @param shutdownOnExit  If {@code true} the {@link ExecutorService#shutdownNow()}
+         *                        will be called when subsystem terminates - unless it is the ad-hoc
+         *                        service, which will be shutdown regardless
+         */
+        public Factory(ExecutorService executorService, boolean shutdownOnExit) {
+            executors = executorService;
+            shutdownExecutor = shutdownOnExit;
+        }
+
+        public ExecutorService getExecutorService() {
+            return executors;
+        }
+
+        public boolean isShutdownOnExit() {
+            return shutdownExecutor;
+        }
+
+        public Command create() {
+            return new NetconfSshdTestSubsystem(getExecutorService(), isShutdownOnExit());
+        }
+
+        public String getName() {
+            return NAME;
+        }
+    }
+
+    /**
+     * Properties key for the maximum of available open handles per session.
+     */
+    private static final String CLOSE_SESSION = "<close-session";
+    private static final String END_PATTERN = "]]>]]>";
+
+    private ExecutorService executors;
+    private boolean shutdownExecutor;
+    private ExitCallback callback;
+    private ServerSession session;
+    private InputStream in;
+    private OutputStream out;
+    private OutputStream err;
+    private Environment env;
+    private Future<?> pendingFuture;
+    private boolean closed = false;
+    private NetconfMessageState state;
+    private PrintWriter outputStream;
+
+    public NetconfSshdTestSubsystem() {
+        this(null);
+    }
+
+    /**
+     * @param executorService The {@link ExecutorService} to be used by
+     *                        the {@link SftpSubsystem} command when starting execution. If
+     *                        {@code null} then a single-threaded ad-hoc service is used.
+     *                        <b>Note:</b> the service will <U>not</U> be shutdown when the
+     *                        subsystem is closed - unless it is the ad-hoc service
+     * @see #SftpSubsystem(ExecutorService, boolean)
+     */
+    public NetconfSshdTestSubsystem(ExecutorService executorService) {
+        this(executorService, false);
+    }
+
+    /**
+     * @param executorService The {@link ExecutorService} to be used by
+     *                        the {@link SftpSubsystem} command when starting execution. If
+     *                        {@code null} then a single-threaded ad-hoc service is used.
+     * @param shutdownOnExit  If {@code true} the {@link ExecutorService#shutdownNow()}
+     *                        will be called when subsystem terminates - unless it is the ad-hoc
+     *                        service, which will be shutdown regardless
+     * @see ThreadUtils#newSingleThreadExecutor(String)
+     */
+    public NetconfSshdTestSubsystem(ExecutorService executorService, boolean shutdownOnExit) {
+        executors = executorService;
+        if (executorService == null) {
+            executors = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName());
+            shutdownExecutor = true;    // we always close the ad-hoc executor service
+        } else {
+            shutdownExecutor = shutdownOnExit;
+        }
+    }
+
+    @Override
+    public void setSession(ServerSession session) {
+        this.session = session;
+    }
+
+    @Override
+    public void run() {
+        BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
+        boolean socketClosed = false;
+        try {
+            StringBuilder deviceRequestBuilder = new StringBuilder();
+            while (!socketClosed) {
+                int cInt = bufferReader.read();
+                if (cInt == -1) {
+                    log.info("Netconf client sent error");
+                    socketClosed = true;
+                }
+                char c = (char) cInt;
+                state = state.evaluateChar(c);
+                deviceRequestBuilder.append(c);
+                if (state == NetconfMessageState.END_PATTERN) {
+                    String deviceRequest = deviceRequestBuilder.toString();
+                    if (deviceRequest.equals(END_PATTERN)) {
+                        socketClosed = true;
+                        this.interrupt();
+                    } else {
+                        deviceRequest = deviceRequest.replace(END_PATTERN, "");
+                        Optional<Integer> messageId = NetconfStreamThread.getMsgId(deviceRequest);
+                        log.info("Client Request on session {}. MsgId {}: {}",
+                                session.getId(), messageId, deviceRequest);
+                        synchronized (outputStream) {
+                            if (NetconfSessionImplTest.HELLO_REQ_PATTERN.matcher(deviceRequest).matches()) {
+                                String helloReply =
+                                        NetconfSessionImplTest.getTestHelloReply(Optional.of(session.getId()));
+                                outputStream.write(helloReply + END_PATTERN);
+                                outputStream.flush();
+                            } else if (NetconfSessionImplTest.EDIT_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
+                                 || NetconfSessionImplTest.COPY_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()) {
+                                outputStream.write(NetconfSessionImplTest.getOkReply(messageId) + END_PATTERN);
+                                outputStream.flush();
+                            } else if (NetconfSessionImplTest.GET_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
+                                    || NetconfSessionImplTest.GET_REQ_PATTERN.matcher(deviceRequest).matches()) {
+                                outputStream.write(NetconfSessionImplTest.getGetReply(messageId) + END_PATTERN);
+                                outputStream.flush();
+                            } else if (deviceRequest.contains(CLOSE_SESSION)) {
+                                socketClosed = true;
+                                outputStream.write(NetconfSessionImplTest.getOkReply(messageId) + END_PATTERN);
+                                outputStream.flush();
+                            } else {
+                                log.error("Unexpected NETCONF message structure on session {} : {}",
+                                        session.getId(), deviceRequest);
+                            }
+                        }
+                        deviceRequestBuilder.setLength(0);
+                    }
+                }
+            }
+        } catch (Throwable t) {
+            if (!socketClosed && !(t instanceof EOFException)) { // Ignore
+                log.error("Exception caught in NETCONF Server subsystem", t.getMessage());
+            }
+        } finally {
+            try {
+                bufferReader.close();
+            } catch (IOException ioe) {
+                log.error("Could not close DataInputStream", ioe);
+            }
+
+            callback.onExit(0);
+        }
+    }
+
+    @Override
+    public void setInputStream(InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public void setOutputStream(OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void setErrorStream(OutputStream err) {
+        this.err = err;
+    }
+
+    @Override
+    public void setExitCallback(ExitCallback callback) {
+        this.callback = callback;
+    }
+
+    @Override
+    public void start(Environment env) throws IOException {
+        this.env = env;
+        state = NetconfMessageState.NO_MATCHING_PATTERN;
+        outputStream = new PrintWriter(out, false);
+        try {
+            pendingFuture = executors.submit(this);
+        } catch (RuntimeException e) {    // e.g., RejectedExecutionException
+            log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.getMessage(), e);
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void interrupt() {
+        // if thread has not completed, cancel it
+        if ((pendingFuture != null) && (!pendingFuture.isDone())) {
+            boolean result = pendingFuture.cancel(true);
+            // TODO consider waiting some reasonable (?) amount of time for cancellation
+            if (log.isDebugEnabled()) {
+                log.debug("interrupt() - cancel pending future=" + result);
+            }
+        }
+
+        pendingFuture = null;
+
+        if ((executors != null) && shutdownExecutor) {
+            Collection<Runnable> runners = executors.shutdownNow();
+            if (log.isDebugEnabled()) {
+                log.debug("interrupt() - shutdown executor service - runners count=" +
+                        runners.size());
+            }
+        }
+
+        executors = null;
+
+        if (!closed) {
+            if (log.isDebugEnabled()) {
+                log.debug("interrupt() - mark as closed");
+            }
+
+            closed = true;
+        }
+        outputStream.close();
+    }
+
+    @Override
+    public void destroy() {
+        //Handled by interrupt
+    }
+
+    protected void process(Buffer buffer) throws IOException {
+        log.warn("Receieved buffer:" + buffer);
+    }
+}