ONOS-3839 Fixing errors in sending requests and hanging on future.join

Change-Id: I6da5bf1ff728efeb0d531cf7f04f6bf49f11a0a9
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 9f72d83..9a950fb 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -28,12 +28,14 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
@@ -48,17 +50,22 @@
 
     private static final int CONNECTION_TIMEOUT = 0;
     private static final String ENDPATTERN = "]]>]]>";
-    private static final AtomicInteger MESSAGE_ID_INTEGER = new AtomicInteger(0);
     private static final String MESSAGE_ID_STRING = "message-id";
     private static final String HELLO = "hello";
     private static final String NEW_LINE = "\n";
+    private static final int FUTURE_REPLY_TIMEOUT = 5000;
+    private static final String ERROR = "ERROR ";
+    private static final String END_OF_RPC_OPEN_TAG = "\">";
+    private static final String EQUAL = "=";
+    private static final String NUMBER_BETWEEN_QUOTES_MATCHER = "\"+([0-9]+)+\"";
+    private static final String XML_HEADER =
+            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
 
-
+    private final AtomicInteger messageIdInteger = new AtomicInteger(0);
     private Connection netconfConnection;
     private NetconfDeviceInfo deviceInfo;
     private Session sshSession;
     private boolean connectionActive;
-    private PrintWriter out = null;
     private List<String> deviceCapabilities =
             Collections.singletonList("urn:ietf:params:netconf:base:1.0");
     private String serverCapabilities;
@@ -116,7 +123,6 @@
         try {
             sshSession = netconfConnection.openSession();
             sshSession.startSubSystem("netconf");
-            out = new PrintWriter(sshSession.getStdin());
             t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
                                         sshSession.getStderr(), deviceInfo,
                                         new NetconfSessionDelegateImpl());
@@ -130,17 +136,20 @@
         }
     }
 
-    private void sendHello() throws IOException {
+    private void sendHello() throws NetconfException {
         serverCapabilities = sendRequest(createHelloString());
     }
 
     private String createHelloString() {
         StringBuilder hellobuffer = new StringBuilder();
-        hellobuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+        hellobuffer.append(XML_HEADER);
+        hellobuffer.append("\n");
         hellobuffer.append("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         hellobuffer.append("  <capabilities>\n");
         deviceCapabilities.forEach(
-                cap -> hellobuffer.append("    <capability>" + cap + "</capability>\n"));
+                cap -> hellobuffer.append("    <capability>")
+                        .append(cap)
+                        .append("</capability>\n"));
         hellobuffer.append("  </capabilities>\n");
         hellobuffer.append("</hello>\n");
         hellobuffer.append(ENDPATTERN);
@@ -166,31 +175,62 @@
 
     @Override
     public String requestSync(String request) throws NetconfException {
-        String reply = sendRequest(request + NEW_LINE + ENDPATTERN);
-        return checkReply(reply) ? reply : "ERROR " + reply;
+        if (!request.contains(ENDPATTERN)) {
+            request = request + NEW_LINE + ENDPATTERN;
+        }
+        String reply = sendRequest(request);
+        return checkReply(reply) ? reply : ERROR + reply;
     }
 
     @Override
     public CompletableFuture<String> request(String request) {
         CompletableFuture<String> ftrep = t.sendMessage(request);
-        replies.put(MESSAGE_ID_INTEGER.get(), ftrep);
+        replies.put(messageIdInteger.get(), ftrep);
         return ftrep;
     }
 
     private String sendRequest(String request) throws NetconfException {
         checkAndRestablishSession();
-        //FIXME find out a better way to enforce the presence of message-id
-        if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
-            request = request.replaceFirst("\">", "\" message-id=\""
-                    + MESSAGE_ID_INTEGER.get() + "\"" + ">");
-        }
+        request = formatRequestMessageId(request);
+        request = formatXmlHeader(request);
         CompletableFuture<String> futureReply = request(request);
-        MESSAGE_ID_INTEGER.incrementAndGet();
-        String rp = futureReply.join();
-        log.debug("Reply from device {}", rp);
+        String rp;
+        try {
+            rp = futureReply.get(FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            //replies.remove(messageIdInteger.get());
+            throw new NetconfException("Can't get the reply for request" + request, e);
+        }
+//        String rp = Tools.futureGetOrElse(futureReply, FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS,
+//                                          "Error in completing the request with message-id " +
+//                                                  messageIdInteger.get() +
+//                                                  ": future timed out.");
+        messageIdInteger.incrementAndGet();
+        log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
         return rp;
     }
 
+    private String formatRequestMessageId(String request) {
+        if (request.contains(MESSAGE_ID_STRING)) {
+            //FIXME if application provieds his own counting of messages this fails that count
+            request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
+                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageIdInteger.get() + "\"");
+        } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
+            //FIXME find out a better way to enforce the presence of message-id
+            request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
+                    + messageIdInteger.get() + "\"" + ">");
+        }
+        return request;
+    }
+
+    private String formatXmlHeader(String request) {
+        if (!request.contains(XML_HEADER)) {
+            //FIXME if application provieds his own XML header of different type there is a clash
+            request = XML_HEADER + "\n" + request;
+        }
+        return request;
+    }
+
     @Override
     public String get(String request) throws NetconfException {
         return requestSync(request);
@@ -203,16 +243,21 @@
 
     @Override
     public String getConfig(String targetConfiguration, String configurationSchema) throws NetconfException {
-        StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
-                           + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc ");
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<get-config>\n");
         rpc.append("<source>\n");
-        rpc.append("<" + targetConfiguration + "/>");
+        rpc.append("<").append(targetConfiguration).append("/>");
         rpc.append("</source>");
         if (configurationSchema != null) {
             rpc.append("<filter type=\"subtree\">\n");
-            rpc.append(configurationSchema + "\n");
+            rpc.append(configurationSchema).append("\n");
             rpc.append("</filter>\n");
         }
         rpc.append("</get-config>\n");
@@ -232,12 +277,17 @@
     public boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
             throws NetconfException {
         newConfiguration = newConfiguration.trim();
-        StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
-        rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\"  "
-                           + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
+        rpc.append("<rpc ");
+        rpc.append(MESSAGE_ID_STRING);
+        rpc.append(EQUAL);
+        rpc.append("\"");
+        rpc.append(messageIdInteger.get());
+        rpc.append("\"  ");
+        rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<edit-config>");
         rpc.append("<target>");
-        rpc.append("<" + targetConfiguration + "/>");
+        rpc.append("<").append(targetConfiguration).append("/>");
         rpc.append("</target>");
         rpc.append("<default-operation>");
         rpc.append(mode);
@@ -259,15 +309,14 @@
             newConfiguration = "<configuration>" + newConfiguration
                     + "</configuration>";
         }
-        StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
-                                                      "encoding=\"UTF-8\"?>");
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
         rpc.append("<rpc>");
         rpc.append("<copy-config>");
         rpc.append("<target>");
-        rpc.append("<" + targetConfiguration + "/>");
+        rpc.append("<").append(targetConfiguration).append("/>");
         rpc.append("</target>");
         rpc.append("<source>");
-        rpc.append("<" + newConfiguration + "/>");
+        rpc.append("<").append(newConfiguration).append("/>");
         rpc.append("</source>");
         rpc.append("</copy-config>");
         rpc.append("</rpc>");
@@ -282,12 +331,11 @@
                      targetConfiguration);
             return false;
         }
-        StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
-                                                      "encoding=\"UTF-8\"?>");
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
         rpc.append("<rpc>");
         rpc.append("<delete-config>");
         rpc.append("<target>");
-        rpc.append("<" + targetConfiguration + "/>");
+        rpc.append("<").append(targetConfiguration).append("/>");
         rpc.append("</target>");
         rpc.append("</delete-config>");
         rpc.append("</rpc>");
@@ -297,8 +345,7 @@
 
     @Override
     public boolean lock() throws NetconfException {
-        StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
-                                                      "encoding=\"UTF-8\"?>");
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
         rpc.append("<rpc>");
         rpc.append("<lock>");
         rpc.append("<target>");
@@ -312,8 +359,7 @@
 
     @Override
     public boolean unlock() throws NetconfException {
-        StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
-                                                      "encoding=\"UTF-8\"?>");
+        StringBuilder rpc = new StringBuilder(XML_HEADER);
         rpc.append("<rpc>");
         rpc.append("<unlock>");
         rpc.append("<target>");