Fixed some concurrency issues in NETCONF Session Added Unit Tests

Change-Id: I84fe0c17e3d757948a859f78d01fbb025397a44d
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSshdTestSubsystem.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSshdTestSubsystem.java
new file mode 100644
index 0000000..b2f53f5
--- /dev/null
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/NetconfSshdTestSubsystem.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf.ctl;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.ThreadUtils;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.session.ServerSession;
+import org.onosproject.netconf.ctl.NetconfStreamThread.NetconfMessageState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocks a NETCONF Device to test the NETCONF Southbound Interface etc.
+ *
+ * Implements the 'netconf' subsystem on Apache SSH (Mina).
+ * See SftpSubsystem for an example of another subsystem
+ */
+public class NetconfSshdTestSubsystem extends Thread implements Command, Runnable, SessionAware {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    public static class Factory implements NamedFactory<Command> {
+
+        public static final String NAME = "netconf";
+
+        private final ExecutorService   executors;
+        private final boolean shutdownExecutor;
+
+        public Factory() {
+            this(null);
+        }
+
+        /**
+         * @param executorService The {@link ExecutorService} to be used by
+         *                        the {@link SftpSubsystem} command when starting execution. If
+         *                        {@code null} then a single-threaded ad-hoc service is used.
+         *                        <B>Note:</B> the service will <U>not</U> be shutdown when the
+         *                        subsystem is closed - unless it is the ad-hoc service, which will be
+         *                        shutdown regardless
+         * @see Factory(ExecutorService, boolean)}
+         */
+        public Factory(ExecutorService executorService) {
+            this(executorService, false);
+        }
+
+        /**
+         * @param executorService The {@link ExecutorService} to be used by
+         *                        the {@link SftpSubsystem} command when starting execution. If
+         *                        {@code null} then a single-threaded ad-hoc service is used.
+         * @param shutdownOnExit  If {@code true} the {@link ExecutorService#shutdownNow()}
+         *                        will be called when subsystem terminates - unless it is the ad-hoc
+         *                        service, which will be shutdown regardless
+         */
+        public Factory(ExecutorService executorService, boolean shutdownOnExit) {
+            executors = executorService;
+            shutdownExecutor = shutdownOnExit;
+        }
+
+        public ExecutorService getExecutorService() {
+            return executors;
+        }
+
+        public boolean isShutdownOnExit() {
+            return shutdownExecutor;
+        }
+
+        public Command create() {
+            return new NetconfSshdTestSubsystem(getExecutorService(), isShutdownOnExit());
+        }
+
+        public String getName() {
+            return NAME;
+        }
+    }
+
+    /**
+     * Properties key for the maximum of available open handles per session.
+     */
+    private static final String CLOSE_SESSION = "<close-session";
+    private static final String END_PATTERN = "]]>]]>";
+
+    private ExecutorService executors;
+    private boolean shutdownExecutor;
+    private ExitCallback callback;
+    private ServerSession session;
+    private InputStream in;
+    private OutputStream out;
+    private OutputStream err;
+    private Environment env;
+    private Future<?> pendingFuture;
+    private boolean closed = false;
+    private NetconfMessageState state;
+    private PrintWriter outputStream;
+
+    public NetconfSshdTestSubsystem() {
+        this(null);
+    }
+
+    /**
+     * @param executorService The {@link ExecutorService} to be used by
+     *                        the {@link SftpSubsystem} command when starting execution. If
+     *                        {@code null} then a single-threaded ad-hoc service is used.
+     *                        <b>Note:</b> the service will <U>not</U> be shutdown when the
+     *                        subsystem is closed - unless it is the ad-hoc service
+     * @see #SftpSubsystem(ExecutorService, boolean)
+     */
+    public NetconfSshdTestSubsystem(ExecutorService executorService) {
+        this(executorService, false);
+    }
+
+    /**
+     * @param executorService The {@link ExecutorService} to be used by
+     *                        the {@link SftpSubsystem} command when starting execution. If
+     *                        {@code null} then a single-threaded ad-hoc service is used.
+     * @param shutdownOnExit  If {@code true} the {@link ExecutorService#shutdownNow()}
+     *                        will be called when subsystem terminates - unless it is the ad-hoc
+     *                        service, which will be shutdown regardless
+     * @see ThreadUtils#newSingleThreadExecutor(String)
+     */
+    public NetconfSshdTestSubsystem(ExecutorService executorService, boolean shutdownOnExit) {
+        executors = executorService;
+        if (executorService == null) {
+            executors = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName());
+            shutdownExecutor = true;    // we always close the ad-hoc executor service
+        } else {
+            shutdownExecutor = shutdownOnExit;
+        }
+    }
+
+    @Override
+    public void setSession(ServerSession session) {
+        this.session = session;
+    }
+
+    @Override
+    public void run() {
+        BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
+        boolean socketClosed = false;
+        try {
+            StringBuilder deviceRequestBuilder = new StringBuilder();
+            while (!socketClosed) {
+                int cInt = bufferReader.read();
+                if (cInt == -1) {
+                    log.info("Netconf client sent error");
+                    socketClosed = true;
+                }
+                char c = (char) cInt;
+                state = state.evaluateChar(c);
+                deviceRequestBuilder.append(c);
+                if (state == NetconfMessageState.END_PATTERN) {
+                    String deviceRequest = deviceRequestBuilder.toString();
+                    if (deviceRequest.equals(END_PATTERN)) {
+                        socketClosed = true;
+                        this.interrupt();
+                    } else {
+                        deviceRequest = deviceRequest.replace(END_PATTERN, "");
+                        Optional<Integer> messageId = NetconfStreamThread.getMsgId(deviceRequest);
+                        log.info("Client Request on session {}. MsgId {}: {}",
+                                session.getId(), messageId, deviceRequest);
+                        synchronized (outputStream) {
+                            if (NetconfSessionImplTest.HELLO_REQ_PATTERN.matcher(deviceRequest).matches()) {
+                                String helloReply =
+                                        NetconfSessionImplTest.getTestHelloReply(Optional.of(session.getId()));
+                                outputStream.write(helloReply + END_PATTERN);
+                                outputStream.flush();
+                            } else if (NetconfSessionImplTest.EDIT_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
+                                 || NetconfSessionImplTest.COPY_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()) {
+                                outputStream.write(NetconfSessionImplTest.getOkReply(messageId) + END_PATTERN);
+                                outputStream.flush();
+                            } else if (NetconfSessionImplTest.GET_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
+                                    || NetconfSessionImplTest.GET_REQ_PATTERN.matcher(deviceRequest).matches()) {
+                                outputStream.write(NetconfSessionImplTest.getGetReply(messageId) + END_PATTERN);
+                                outputStream.flush();
+                            } else if (deviceRequest.contains(CLOSE_SESSION)) {
+                                socketClosed = true;
+                                outputStream.write(NetconfSessionImplTest.getOkReply(messageId) + END_PATTERN);
+                                outputStream.flush();
+                            } else {
+                                log.error("Unexpected NETCONF message structure on session {} : {}",
+                                        session.getId(), deviceRequest);
+                            }
+                        }
+                        deviceRequestBuilder.setLength(0);
+                    }
+                }
+            }
+        } catch (Throwable t) {
+            if (!socketClosed && !(t instanceof EOFException)) { // Ignore
+                log.error("Exception caught in NETCONF Server subsystem", t.getMessage());
+            }
+        } finally {
+            try {
+                bufferReader.close();
+            } catch (IOException ioe) {
+                log.error("Could not close DataInputStream", ioe);
+            }
+
+            callback.onExit(0);
+        }
+    }
+
+    @Override
+    public void setInputStream(InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public void setOutputStream(OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void setErrorStream(OutputStream err) {
+        this.err = err;
+    }
+
+    @Override
+    public void setExitCallback(ExitCallback callback) {
+        this.callback = callback;
+    }
+
+    @Override
+    public void start(Environment env) throws IOException {
+        this.env = env;
+        state = NetconfMessageState.NO_MATCHING_PATTERN;
+        outputStream = new PrintWriter(out, false);
+        try {
+            pendingFuture = executors.submit(this);
+        } catch (RuntimeException e) {    // e.g., RejectedExecutionException
+            log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.getMessage(), e);
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void interrupt() {
+        // if thread has not completed, cancel it
+        if ((pendingFuture != null) && (!pendingFuture.isDone())) {
+            boolean result = pendingFuture.cancel(true);
+            // TODO consider waiting some reasonable (?) amount of time for cancellation
+            if (log.isDebugEnabled()) {
+                log.debug("interrupt() - cancel pending future=" + result);
+            }
+        }
+
+        pendingFuture = null;
+
+        if ((executors != null) && shutdownExecutor) {
+            Collection<Runnable> runners = executors.shutdownNow();
+            if (log.isDebugEnabled()) {
+                log.debug("interrupt() - shutdown executor service - runners count=" +
+                        runners.size());
+            }
+        }
+
+        executors = null;
+
+        if (!closed) {
+            if (log.isDebugEnabled()) {
+                log.debug("interrupt() - mark as closed");
+            }
+
+            closed = true;
+        }
+        outputStream.close();
+    }
+
+    @Override
+    public void destroy() {
+        //Handled by interrupt
+    }
+
+    protected void process(Buffer buffer) throws IOException {
+        log.warn("Receieved buffer:" + buffer);
+    }
+}