ONOS-3839 Fixing errors in sending requests and hanging on future.join
Change-Id: I6da5bf1ff728efeb0d531cf7f04f6bf49f11a0a9
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 9f72d83..9a950fb 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -28,12 +28,14 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,17 +50,22 @@
private static final int CONNECTION_TIMEOUT = 0;
private static final String ENDPATTERN = "]]>]]>";
- private static final AtomicInteger MESSAGE_ID_INTEGER = new AtomicInteger(0);
private static final String MESSAGE_ID_STRING = "message-id";
private static final String HELLO = "hello";
private static final String NEW_LINE = "\n";
+ private static final int FUTURE_REPLY_TIMEOUT = 5000;
+ private static final String ERROR = "ERROR ";
+ private static final String END_OF_RPC_OPEN_TAG = "\">";
+ private static final String EQUAL = "=";
+ private static final String NUMBER_BETWEEN_QUOTES_MATCHER = "\"+([0-9]+)+\"";
+ private static final String XML_HEADER =
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
-
+ private final AtomicInteger messageIdInteger = new AtomicInteger(0);
private Connection netconfConnection;
private NetconfDeviceInfo deviceInfo;
private Session sshSession;
private boolean connectionActive;
- private PrintWriter out = null;
private List<String> deviceCapabilities =
Collections.singletonList("urn:ietf:params:netconf:base:1.0");
private String serverCapabilities;
@@ -116,7 +123,6 @@
try {
sshSession = netconfConnection.openSession();
sshSession.startSubSystem("netconf");
- out = new PrintWriter(sshSession.getStdin());
t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
sshSession.getStderr(), deviceInfo,
new NetconfSessionDelegateImpl());
@@ -130,17 +136,20 @@
}
}
- private void sendHello() throws IOException {
+ private void sendHello() throws NetconfException {
serverCapabilities = sendRequest(createHelloString());
}
private String createHelloString() {
StringBuilder hellobuffer = new StringBuilder();
- hellobuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+ hellobuffer.append(XML_HEADER);
+ hellobuffer.append("\n");
hellobuffer.append("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
hellobuffer.append(" <capabilities>\n");
deviceCapabilities.forEach(
- cap -> hellobuffer.append(" <capability>" + cap + "</capability>\n"));
+ cap -> hellobuffer.append(" <capability>")
+ .append(cap)
+ .append("</capability>\n"));
hellobuffer.append(" </capabilities>\n");
hellobuffer.append("</hello>\n");
hellobuffer.append(ENDPATTERN);
@@ -166,31 +175,62 @@
@Override
public String requestSync(String request) throws NetconfException {
- String reply = sendRequest(request + NEW_LINE + ENDPATTERN);
- return checkReply(reply) ? reply : "ERROR " + reply;
+ if (!request.contains(ENDPATTERN)) {
+ request = request + NEW_LINE + ENDPATTERN;
+ }
+ String reply = sendRequest(request);
+ return checkReply(reply) ? reply : ERROR + reply;
}
@Override
public CompletableFuture<String> request(String request) {
CompletableFuture<String> ftrep = t.sendMessage(request);
- replies.put(MESSAGE_ID_INTEGER.get(), ftrep);
+ replies.put(messageIdInteger.get(), ftrep);
return ftrep;
}
private String sendRequest(String request) throws NetconfException {
checkAndRestablishSession();
- //FIXME find out a better way to enforce the presence of message-id
- if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
- request = request.replaceFirst("\">", "\" message-id=\""
- + MESSAGE_ID_INTEGER.get() + "\"" + ">");
- }
+ request = formatRequestMessageId(request);
+ request = formatXmlHeader(request);
CompletableFuture<String> futureReply = request(request);
- MESSAGE_ID_INTEGER.incrementAndGet();
- String rp = futureReply.join();
- log.debug("Reply from device {}", rp);
+ String rp;
+ try {
+ rp = futureReply.get(FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ //replies.remove(messageIdInteger.get());
+ throw new NetconfException("Can't get the reply for request" + request, e);
+ }
+// String rp = Tools.futureGetOrElse(futureReply, FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS,
+// "Error in completing the request with message-id " +
+// messageIdInteger.get() +
+// ": future timed out.");
+ messageIdInteger.incrementAndGet();
+ log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
return rp;
}
+ private String formatRequestMessageId(String request) {
+ if (request.contains(MESSAGE_ID_STRING)) {
+ //FIXME if application provieds his own counting of messages this fails that count
+ request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
+ MESSAGE_ID_STRING + EQUAL + "\"" + messageIdInteger.get() + "\"");
+ } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
+ //FIXME find out a better way to enforce the presence of message-id
+ request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
+ + messageIdInteger.get() + "\"" + ">");
+ }
+ return request;
+ }
+
+ private String formatXmlHeader(String request) {
+ if (!request.contains(XML_HEADER)) {
+ //FIXME if application provieds his own XML header of different type there is a clash
+ request = XML_HEADER + "\n" + request;
+ }
+ return request;
+ }
+
@Override
public String get(String request) throws NetconfException {
return requestSync(request);
@@ -203,16 +243,21 @@
@Override
public String getConfig(String targetConfiguration, String configurationSchema) throws NetconfException {
- StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
- rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\" "
- + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+ StringBuilder rpc = new StringBuilder(XML_HEADER);
+ rpc.append("<rpc ");
+ rpc.append(MESSAGE_ID_STRING);
+ rpc.append(EQUAL);
+ rpc.append("\"");
+ rpc.append(messageIdInteger.get());
+ rpc.append("\" ");
+ rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<get-config>\n");
rpc.append("<source>\n");
- rpc.append("<" + targetConfiguration + "/>");
+ rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</source>");
if (configurationSchema != null) {
rpc.append("<filter type=\"subtree\">\n");
- rpc.append(configurationSchema + "\n");
+ rpc.append(configurationSchema).append("\n");
rpc.append("</filter>\n");
}
rpc.append("</get-config>\n");
@@ -232,12 +277,17 @@
public boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
throws NetconfException {
newConfiguration = newConfiguration.trim();
- StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
- rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\" "
- + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+ StringBuilder rpc = new StringBuilder(XML_HEADER);
+ rpc.append("<rpc ");
+ rpc.append(MESSAGE_ID_STRING);
+ rpc.append(EQUAL);
+ rpc.append("\"");
+ rpc.append(messageIdInteger.get());
+ rpc.append("\" ");
+ rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<edit-config>");
rpc.append("<target>");
- rpc.append("<" + targetConfiguration + "/>");
+ rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</target>");
rpc.append("<default-operation>");
rpc.append(mode);
@@ -259,15 +309,14 @@
newConfiguration = "<configuration>" + newConfiguration
+ "</configuration>";
}
- StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
- "encoding=\"UTF-8\"?>");
+ StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<copy-config>");
rpc.append("<target>");
- rpc.append("<" + targetConfiguration + "/>");
+ rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</target>");
rpc.append("<source>");
- rpc.append("<" + newConfiguration + "/>");
+ rpc.append("<").append(newConfiguration).append("/>");
rpc.append("</source>");
rpc.append("</copy-config>");
rpc.append("</rpc>");
@@ -282,12 +331,11 @@
targetConfiguration);
return false;
}
- StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
- "encoding=\"UTF-8\"?>");
+ StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<delete-config>");
rpc.append("<target>");
- rpc.append("<" + targetConfiguration + "/>");
+ rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</target>");
rpc.append("</delete-config>");
rpc.append("</rpc>");
@@ -297,8 +345,7 @@
@Override
public boolean lock() throws NetconfException {
- StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
- "encoding=\"UTF-8\"?>");
+ StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<lock>");
rpc.append("<target>");
@@ -312,8 +359,7 @@
@Override
public boolean unlock() throws NetconfException {
- StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
- "encoding=\"UTF-8\"?>");
+ StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<unlock>");
rpc.append("<target>");