async netconf RPC
- part of ONOS-7020
Change-Id: I27baf72dec06a2613bd4ae634f891c2420201900
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 99f79cd..06fd53a 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -32,6 +32,7 @@
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+import org.onlab.util.SharedExecutors;
import org.onosproject.netconf.DatastoreId;
import org.onosproject.netconf.NetconfDeviceInfo;
import org.onosproject.netconf.NetconfDeviceOutputEvent;
@@ -40,11 +41,10 @@
import org.onosproject.netconf.NetconfException;
import org.onosproject.netconf.NetconfSession;
import org.onosproject.netconf.NetconfSessionFactory;
+import org.onosproject.netconf.NetconfTransportException;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.CharArrayReader;
import java.io.IOException;
@@ -77,8 +77,7 @@
*/
public class NetconfSessionMinaImpl implements NetconfSession {
- private static final Logger log = LoggerFactory
- .getLogger(NetconfSessionMinaImpl.class);
+ private static final Logger log = getLogger(NetconfSessionMinaImpl.class);
/**
* NC 1.0, RFC4742 EOM sequence.
@@ -167,6 +166,8 @@
replies = new ConcurrentHashMap<>();
errorReplies = new ArrayList<>();
+ // FIXME should not immediately start session on construction
+ // setOnosCapabilities() is useless due to this behavior
startConnection();
}
@@ -175,6 +176,8 @@
replies = new ConcurrentHashMap<>();
errorReplies = new ArrayList<>();
setOnosCapabilities(capabilities);
+ // FIXME should not immediately start session on construction
+ // setOnosCapabilities() is useless due to this behavior
startConnection();
}
@@ -206,6 +209,8 @@
startSession();
}
+ // FIXME blocking
+ @Deprecated
private void startSession() throws IOException {
final ConnectFuture connectFuture;
connectFuture = client.connect(deviceInfo.name(),
@@ -251,6 +256,8 @@
return kf.generatePublic(spec);
}
+ // FIXME blocking
+ @Deprecated
private void openChannel() throws IOException {
channel = session.createSubsystemChannel("netconf");
OpenFuture channelFuture = channel.open();
@@ -465,13 +472,53 @@
return message;
}
-
@Override
@Deprecated
public CompletableFuture<String> request(String request) {
return streamHandler.sendMessage(request);
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * FIXME Note: as of 1.12.0
+ * {@code request} must not include message-id, this method will assign
+ * and insert message-id on it's own.
+ * Will require ONOS-7019 to remove this limitation.
+ */
+ @Override
+ public CompletableFuture<String> rpc(String request) {
+
+ String rpc = request;
+ // - assign message-id
+ int msgId = messageIdInteger.incrementAndGet();
+ // - re-write request to insert message-id
+ // FIXME avoid using formatRequestMessageId
+ rpc = formatRequestMessageId(rpc, msgId);
+ // - ensure it contains XML header
+ rpc = formatXmlHeader(rpc);
+ // - use chunked framing if talking to NC 1.1 device
+ // FIXME avoid using formatNetconfMessage
+ rpc = formatNetconfMessage(rpc);
+
+ // TODO session liveness check & recovery
+
+ log.debug("Sending {} to {}", rpc, this.deviceInfo.getDeviceId());
+ return streamHandler.sendMessage(rpc, msgId)
+ .handleAsync((reply, t) -> {
+ if (t != null) {
+ // secure transport-layer error
+ // cannot use NetconfException, which is
+ // checked Exception.
+ throw new NetconfTransportException(t);
+ } else {
+ // FIXME avoid using checkReply, error handling is weird
+ checkReply(reply);
+ return reply;
+ }
+ }, SharedExecutors.getPoolThreadExecutor());
+ }
+
@Override
public int timeoutConnectSec() {
return connectTimeout;
@@ -885,6 +932,7 @@
} else if (reply.contains("<ok/>")
|| (reply.contains("<rpc-error>")
&& reply.contains("warning"))) {
+ // FIXME rpc-error with a warning is considered same as Ok??
log.debug("Device {} sent reply {}", deviceInfo, reply);
return true;
}