ONOS-3605 Create thread Session input stream mechanism, adding listener for events from the device

Change-Id: Ib323487f61d9e595f7ccdc1957a92e58b7002d2a
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 795211e..7f544dc 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -18,56 +18,70 @@
 
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.StreamGobbler;
 import com.google.common.base.Preconditions;
 import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEventListener;
+import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
 
 /**
  * Implementation of a NETCONF session to talk to a device.
  */
 public class NetconfSessionImpl implements NetconfSession {
 
-    public static final Logger log = LoggerFactory
+    private static final Logger log = LoggerFactory
             .getLogger(NetconfSessionImpl.class);
+
+
     private static final int CONNECTION_TIMEOUT = 0;
+    private static final String ENDPATTERN = "]]>]]>";
+    private static final AtomicInteger MESSAGE_ID_INTEGER = new AtomicInteger(0);
+    private static final String MESSAGE_ID_STRING = "message-id";
+    private static final String HELLO = "hello";
+    private static final String NEW_LINE = "\n";
 
 
     private Connection netconfConnection;
     private NetconfDeviceInfo deviceInfo;
     private Session sshSession;
     private boolean connectionActive;
-    private BufferedReader bufferReader = null;
     private PrintWriter out = null;
-    private int messageID = 0;
-    //TODO inject these capabilites from yang model provided by app
     private List<String> deviceCapabilities =
             Collections.singletonList("urn:ietf:params:netconf:base:1.0");
     private String serverCapabilities;
-    private String endpattern = "]]>]]>";
+    private NetconfStreamHandler t;
+    private Map<Integer, CompletableFuture<String>> replies;
 
 
-    public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws IOException {
+    public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
         this.deviceInfo = deviceInfo;
         connectionActive = false;
+        replies = new HashMap<>();
         startConnection();
     }
 
 
-    private void startConnection() throws IOException {
+    private void startConnection() throws NetconfException {
         if (!connectionActive) {
             netconfConnection = new Connection(deviceInfo.ip().toString(), deviceInfo.port());
-            netconfConnection.connect(null, CONNECTION_TIMEOUT, 0);
+            try {
+                netconfConnection.connect(null, CONNECTION_TIMEOUT, 5000);
+            } catch (IOException e) {
+                throw new NetconfException("Cannot open a connection with device" + deviceInfo, e);
+            }
             boolean isAuthenticated;
             try {
                 if (deviceInfo.getKeyFile() != null) {
@@ -75,39 +89,49 @@
                             deviceInfo.name(), deviceInfo.getKeyFile(),
                             deviceInfo.password());
                 } else {
-                    log.info("authenticate with username {} and password {}",
-                             deviceInfo.name(), deviceInfo.password());
+                    log.debug("Authenticating to device {} with username {}",
+                              deviceInfo.getDeviceId(), deviceInfo.name(), deviceInfo.password());
                     isAuthenticated = netconfConnection.authenticateWithPassword(
                             deviceInfo.name(), deviceInfo.password());
                 }
             } catch (IOException e) {
-                throw new IOException("Authentication connection failed:" +
-                                              e.getMessage());
+                log.error("Authentication connection to device " +
+                                  deviceInfo.getDeviceId() + " failed:" +
+                                  e.getMessage());
+                throw new NetconfException("Authentication connection to device " +
+                                                   deviceInfo.getDeviceId() + " failed", e);
             }
 
             connectionActive = true;
             Preconditions.checkArgument(isAuthenticated,
-                                        "Authentication password and username failed");
+                                        "Authentication to device {} with username " +
+                                                "{} Failed",
+                                        deviceInfo.getDeviceId(), deviceInfo.name(),
+                                        deviceInfo.password());
             startSshSession();
         }
     }
 
-    private void startSshSession() throws IOException {
+    private void startSshSession() throws NetconfException {
         try {
             sshSession = netconfConnection.openSession();
             sshSession.startSubSystem("netconf");
-            bufferReader = new BufferedReader(new InputStreamReader(new StreamGobbler(
-                    sshSession.getStdout())));
             out = new PrintWriter(sshSession.getStdin());
+            t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+                                        sshSession.getStderr(), deviceInfo,
+                                        new NetconfSessionDelegateImpl());
+            this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
             sendHello();
         } catch (IOException e) {
-            throw new IOException("Failed to create ch.ethz.ssh2.Session session:" +
-                                          e.getMessage());
+            log.error("Failed to create ch.ethz.ssh2.Session session:" +
+                              e.getMessage());
+            throw new NetconfException("Failed to create ch.ethz.ssh2.Session session with device" +
+                                               deviceInfo, e);
         }
     }
 
     private void sendHello() throws IOException {
-        serverCapabilities = doRequest(createHelloString());
+        serverCapabilities = sendRequest(createHelloString());
     }
 
     private String createHelloString() {
@@ -119,58 +143,68 @@
                 cap -> hellobuffer.append("    <capability>" + cap + "</capability>\n"));
         hellobuffer.append("  </capabilities>\n");
         hellobuffer.append("</hello>\n");
-        hellobuffer.append(endpattern);
+        hellobuffer.append(ENDPATTERN);
         return hellobuffer.toString();
 
     }
 
-    @Override
-    public String doRPC(String request) throws IOException {
-        String reply = doRequest(request + "\n" + endpattern);
-        return checkReply(reply) ? reply : "ERROR " + reply;
-    }
-
-    private String doRequest(String request) throws IOException {
-        //log.info("sshState " + sshSession.getState() + "request" + request);
-        checkAndRestablishSession();
-        //log.info("sshState after" + sshSession.getState());
-        out.print(request);
-        out.flush();
-        messageID++;
-        return readOne();
-    }
-
-    private void checkAndRestablishSession() throws IOException {
+    private void checkAndRestablishSession() throws NetconfException {
         if (sshSession.getState() != 2) {
             try {
                 startSshSession();
             } catch (IOException e) {
-                log.info("the connection had to be reopened");
+                log.debug("The connection with {} had to be reopened", deviceInfo.getDeviceId());
                 try {
                     startConnection();
                 } catch (IOException e2) {
                     log.error("No connection {} for device, exception {}", netconfConnection, e2);
-                    throw new IOException(e.getMessage());
-                    //TODO remove device from ONOS
+                    throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
                 }
             }
         }
     }
 
     @Override
-    public String get(String request) throws IOException {
-        return doRPC(request);
+    public String requestSync(String request) throws NetconfException {
+        String reply = sendRequest(request + NEW_LINE + ENDPATTERN);
+        return checkReply(reply) ? reply : "ERROR " + reply;
     }
 
     @Override
-    public String getConfig(String targetConfiguration) throws IOException {
+    public CompletableFuture<String> request(String request) {
+        CompletableFuture<String> ftrep = t.sendMessage(request);
+        replies.put(MESSAGE_ID_INTEGER.get(), ftrep);
+        return ftrep;
+    }
+
+    private String sendRequest(String request) throws NetconfException {
+        checkAndRestablishSession();
+        //FIXME find out a better way to enforce the presence of message-id
+        if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
+            request = request.replaceFirst("\">", "\" message-id=\""
+                    + MESSAGE_ID_INTEGER.get() + "\"" + ">");
+        }
+        CompletableFuture<String> futureReply = request(request);
+        MESSAGE_ID_INTEGER.incrementAndGet();
+        String rp = futureReply.join();
+        log.debug("Reply from device {}", rp);
+        return rp;
+    }
+
+    @Override
+    public String get(String request) throws NetconfException {
+        return requestSync(request);
+    }
+
+    @Override
+    public String getConfig(String targetConfiguration) throws NetconfException {
         return getConfig(targetConfiguration, null);
     }
 
     @Override
-    public String getConfig(String targetConfiguration, String configurationSchema) throws IOException {
+    public String getConfig(String targetConfiguration, String configurationSchema) throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + messageID + "\"  "
+        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
                            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<get-config>\n");
         rpc.append("<source>\n");
@@ -183,23 +217,23 @@
         }
         rpc.append("</get-config>\n");
         rpc.append("</rpc>\n");
-        rpc.append(endpattern);
-        String reply = doRequest(rpc.toString());
+        rpc.append(ENDPATTERN);
+        String reply = sendRequest(rpc.toString());
         return checkReply(reply) ? reply : "ERROR " + reply;
     }
 
     @Override
-    public boolean editConfig(String newConfiguration) throws IOException {
-        newConfiguration = newConfiguration + endpattern;
-        return checkReply(doRequest(newConfiguration));
+    public boolean editConfig(String newConfiguration) throws NetconfException {
+        newConfiguration = newConfiguration + ENDPATTERN;
+        return checkReply(sendRequest(newConfiguration));
     }
 
     @Override
     public boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
-            throws IOException {
+            throws NetconfException {
         newConfiguration = newConfiguration.trim();
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + messageID + "\"  "
+        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
                            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<edit-config>");
         rpc.append("<target>");
@@ -213,13 +247,13 @@
         rpc.append("</config>");
         rpc.append("</edit-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
     public boolean copyConfig(String targetConfiguration, String newConfiguration)
-            throws IOException {
+            throws NetconfException {
         newConfiguration = newConfiguration.trim();
         if (!newConfiguration.startsWith("<configuration>")) {
             newConfiguration = "<configuration>" + newConfiguration
@@ -237,12 +271,12 @@
         rpc.append("</source>");
         rpc.append("</copy-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean deleteConfig(String targetConfiguration) throws IOException {
+    public boolean deleteConfig(String targetConfiguration) throws NetconfException {
         if (targetConfiguration.equals("running")) {
             log.warn("Target configuration for delete operation can't be \"running\"",
                      targetConfiguration);
@@ -257,12 +291,12 @@
         rpc.append("</target>");
         rpc.append("</delete-config>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean lock() throws IOException {
+    public boolean lock() throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
                                                       "encoding=\"UTF-8\"?>");
         rpc.append("<rpc>");
@@ -272,12 +306,12 @@
         rpc.append("</target>");
         rpc.append("</lock>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean unlock() throws IOException {
+    public boolean unlock() throws NetconfException {
         StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
                                                       "encoding=\"UTF-8\"?>");
         rpc.append("<rpc>");
@@ -287,16 +321,16 @@
         rpc.append("</target>");
         rpc.append("</unlock>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString()));
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString()));
     }
 
     @Override
-    public boolean close() throws IOException {
+    public boolean close() throws NetconfException {
         return close(false);
     }
 
-    private boolean close(boolean force) throws IOException {
+    private boolean close(boolean force) throws NetconfException {
         StringBuilder rpc = new StringBuilder();
         rpc.append("<rpc>");
         if (force) {
@@ -306,8 +340,8 @@
         }
         rpc.append("<close-configuration/>");
         rpc.append("</rpc>");
-        rpc.append(endpattern);
-        return checkReply(doRequest(rpc.toString())) || close(true);
+        rpc.append(ENDPATTERN);
+        return checkReply(sendRequest(rpc.toString())) || close(true);
     }
 
     @Override
@@ -335,7 +369,17 @@
         deviceCapabilities = capabilities;
     }
 
-    private boolean checkReply(String reply) {
+    @Override
+    public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        t.addDeviceEventListener(listener);
+    }
+
+    @Override
+    public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        t.removeDeviceEventListener(listener);
+    }
+
+    private boolean checkReply(String reply) throws NetconfException {
         if (reply != null) {
             if (!reply.contains("<rpc-error>")) {
                 return true;
@@ -345,36 +389,18 @@
                 return true;
             }
         }
-        log.warn("Error in reply {}", reply);
+        log.warn("Device " + deviceInfo + "has error in reply {}", reply);
         return false;
     }
 
-    private String readOne() throws IOException {
-        //TODO try a simple string
-        final StringWriter reply = new StringWriter();
-        while (true) {
-            int charRead = bufferReader.read();
-            if (charRead == -1) {
-                throw new IOException("Session closed");
-            }
+    public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
 
-            for (int i = 0; i < endpattern.length(); i++) {
-                if (charRead == endpattern.charAt(i)) {
-                    if (i < endpattern.length() - 1) {
-                        charRead = bufferReader.read();
-                    } else {
-                        return reply.getBuffer().toString();
-                    }
-                } else {
-                    String s = endpattern.substring(0, i);
-                    for (int j = 0; i < s.length(); j++) {
-                        reply.write(s.charAt(j));
-                    }
-                    reply.write(charRead);
-                    break;
-                }
-            }
+        @Override
+        public void notify(NetconfDeviceOutputEvent event) {
+            CompletableFuture<String> completedReply = replies.get(event.getMessageID());
+            completedReply.complete(event.getMessagePayload());
         }
     }
 
+
 }