async netconf RPC

- part of ONOS-7020

Change-Id: I27baf72dec06a2613bd4ae634f891c2420201900
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 99f79cd..06fd53a 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -32,6 +32,7 @@
 import org.bouncycastle.openssl.PEMParser;
 import org.bouncycastle.openssl.PEMKeyPair;
 import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.netconf.DatastoreId;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
@@ -40,11 +41,10 @@
 import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
 import org.onosproject.netconf.NetconfSessionFactory;
+import org.onosproject.netconf.NetconfTransportException;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.CharArrayReader;
 import java.io.IOException;
@@ -77,8 +77,7 @@
  */
 public class NetconfSessionMinaImpl implements NetconfSession {
 
-    private static final Logger log = LoggerFactory
-            .getLogger(NetconfSessionMinaImpl.class);
+    private static final Logger log = getLogger(NetconfSessionMinaImpl.class);
 
     /**
      * NC 1.0, RFC4742 EOM sequence.
@@ -167,6 +166,8 @@
         replies = new ConcurrentHashMap<>();
         errorReplies = new ArrayList<>();
 
+        // FIXME should not immediately start session on construction
+        // setOnosCapabilities() is useless due to this behavior
         startConnection();
     }
 
@@ -175,6 +176,8 @@
         replies = new ConcurrentHashMap<>();
         errorReplies = new ArrayList<>();
         setOnosCapabilities(capabilities);
+        // FIXME should not immediately start session on construction
+        // setOnosCapabilities() is useless due to this behavior
         startConnection();
     }
 
@@ -206,6 +209,8 @@
         startSession();
     }
 
+    // FIXME blocking
+    @Deprecated
     private void startSession() throws IOException {
         final ConnectFuture connectFuture;
         connectFuture = client.connect(deviceInfo.name(),
@@ -251,6 +256,8 @@
         return kf.generatePublic(spec);
     }
 
+    // FIXME blocking
+    @Deprecated
     private void openChannel() throws IOException {
         channel = session.createSubsystemChannel("netconf");
         OpenFuture channelFuture = channel.open();
@@ -465,13 +472,53 @@
         return message;
     }
 
-
     @Override
     @Deprecated
     public CompletableFuture<String> request(String request) {
         return streamHandler.sendMessage(request);
     }
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * FIXME Note: as of 1.12.0
+     * {@code request} must not include message-id, this method will assign
+     * and insert message-id on it's own.
+     * Will require ONOS-7019 to remove this limitation.
+     */
+    @Override
+    public CompletableFuture<String> rpc(String request) {
+
+        String rpc = request;
+        //  - assign message-id
+        int msgId = messageIdInteger.incrementAndGet();
+        //  - re-write request to insert message-id
+        // FIXME avoid using formatRequestMessageId
+        rpc = formatRequestMessageId(rpc, msgId);
+        //  - ensure it contains XML header
+        rpc = formatXmlHeader(rpc);
+        //  - use chunked framing if talking to NC 1.1 device
+        // FIXME avoid using formatNetconfMessage
+        rpc = formatNetconfMessage(rpc);
+
+        // TODO session liveness check & recovery
+
+        log.debug("Sending {} to {}", rpc, this.deviceInfo.getDeviceId());
+        return streamHandler.sendMessage(rpc, msgId)
+                    .handleAsync((reply, t) -> {
+                        if (t != null) {
+                            // secure transport-layer error
+                            // cannot use NetconfException, which is
+                            // checked Exception.
+                            throw new NetconfTransportException(t);
+                        } else {
+                            // FIXME avoid using checkReply, error handling is weird
+                            checkReply(reply);
+                            return reply;
+                        }
+                    }, SharedExecutors.getPoolThreadExecutor());
+    }
+
     @Override
     public int timeoutConnectSec() {
         return connectTimeout;
@@ -885,6 +932,7 @@
             } else if (reply.contains("<ok/>")
                     || (reply.contains("<rpc-error>")
                     && reply.contains("warning"))) {
+                // FIXME rpc-error with a warning is considered same as Ok??
                 log.debug("Device {} sent reply {}", deviceInfo, reply);
                 return true;
             }