async netconf RPC

- part of ONOS-7020

Change-Id: I27baf72dec06a2613bd4ae634f891c2420201900
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index 4d8cfa1..9481b8d 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -45,6 +45,20 @@
     @Deprecated
     CompletableFuture<String> request(String request) throws NetconfException;
 
+    /**
+     * Executes an asynchronous RPC request to the server and obtains a future
+     * for it's response.
+     *
+     * @param request the XML containing the RPC request for the server.
+     * @return Server response or ERROR
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
+     * @throws NetconfTransportException on secure transport-layer error
+     */
+    default CompletableFuture<String> rpc(String request) throws NetconfException {
+        return request(request);
+    }
+
 
     /**
      * Retrieves the requested configuration, different from get-config.
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfTransportException.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfTransportException.java
new file mode 100644
index 0000000..4c610fd
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfTransportException.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf;
+
+/**
+ * Exception triggered from NETCONF secure transport layer or below.
+ */
+public class NetconfTransportException extends RuntimeException {
+
+    private static final long serialVersionUID = 5788096975954688094L;
+
+    public NetconfTransportException() {
+    }
+
+    /**
+     * @param message describing the error
+     */
+    public NetconfTransportException(String message) {
+        super(message);
+    }
+
+    /**
+     * @param cause of this exception
+     */
+    public NetconfTransportException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * @param message describing the error
+     * @param cause of this exception
+     */
+    public NetconfTransportException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 99f79cd..06fd53a 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -32,6 +32,7 @@
 import org.bouncycastle.openssl.PEMParser;
 import org.bouncycastle.openssl.PEMKeyPair;
 import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.netconf.DatastoreId;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
@@ -40,11 +41,10 @@
 import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
 import org.onosproject.netconf.NetconfSessionFactory;
+import org.onosproject.netconf.NetconfTransportException;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.CharArrayReader;
 import java.io.IOException;
@@ -77,8 +77,7 @@
  */
 public class NetconfSessionMinaImpl implements NetconfSession {
 
-    private static final Logger log = LoggerFactory
-            .getLogger(NetconfSessionMinaImpl.class);
+    private static final Logger log = getLogger(NetconfSessionMinaImpl.class);
 
     /**
      * NC 1.0, RFC4742 EOM sequence.
@@ -167,6 +166,8 @@
         replies = new ConcurrentHashMap<>();
         errorReplies = new ArrayList<>();
 
+        // FIXME should not immediately start session on construction
+        // setOnosCapabilities() is useless due to this behavior
         startConnection();
     }
 
@@ -175,6 +176,8 @@
         replies = new ConcurrentHashMap<>();
         errorReplies = new ArrayList<>();
         setOnosCapabilities(capabilities);
+        // FIXME should not immediately start session on construction
+        // setOnosCapabilities() is useless due to this behavior
         startConnection();
     }
 
@@ -206,6 +209,8 @@
         startSession();
     }
 
+    // FIXME blocking
+    @Deprecated
     private void startSession() throws IOException {
         final ConnectFuture connectFuture;
         connectFuture = client.connect(deviceInfo.name(),
@@ -251,6 +256,8 @@
         return kf.generatePublic(spec);
     }
 
+    // FIXME blocking
+    @Deprecated
     private void openChannel() throws IOException {
         channel = session.createSubsystemChannel("netconf");
         OpenFuture channelFuture = channel.open();
@@ -465,13 +472,53 @@
         return message;
     }
 
-
     @Override
     @Deprecated
     public CompletableFuture<String> request(String request) {
         return streamHandler.sendMessage(request);
     }
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * FIXME Note: as of 1.12.0
+     * {@code request} must not include message-id, this method will assign
+     * and insert message-id on it's own.
+     * Will require ONOS-7019 to remove this limitation.
+     */
+    @Override
+    public CompletableFuture<String> rpc(String request) {
+
+        String rpc = request;
+        //  - assign message-id
+        int msgId = messageIdInteger.incrementAndGet();
+        //  - re-write request to insert message-id
+        // FIXME avoid using formatRequestMessageId
+        rpc = formatRequestMessageId(rpc, msgId);
+        //  - ensure it contains XML header
+        rpc = formatXmlHeader(rpc);
+        //  - use chunked framing if talking to NC 1.1 device
+        // FIXME avoid using formatNetconfMessage
+        rpc = formatNetconfMessage(rpc);
+
+        // TODO session liveness check & recovery
+
+        log.debug("Sending {} to {}", rpc, this.deviceInfo.getDeviceId());
+        return streamHandler.sendMessage(rpc, msgId)
+                    .handleAsync((reply, t) -> {
+                        if (t != null) {
+                            // secure transport-layer error
+                            // cannot use NetconfException, which is
+                            // checked Exception.
+                            throw new NetconfTransportException(t);
+                        } else {
+                            // FIXME avoid using checkReply, error handling is weird
+                            checkReply(reply);
+                            return reply;
+                        }
+                    }, SharedExecutors.getPoolThreadExecutor());
+    }
+
     @Override
     public int timeoutConnectSec() {
         return connectTimeout;
@@ -885,6 +932,7 @@
             } else if (reply.contains("<ok/>")
                     || (reply.contains("<rpc-error>")
                     && reply.contains("warning"))) {
+                // FIXME rpc-error with a warning is considered same as Ok??
                 log.debug("Device {} sent reply {}", deviceInfo, reply);
                 return true;
             }