Fixed some concurrency issues in NETCONF Session Added Unit Tests
Change-Id: I84fe0c17e3d757948a859f78d01fbb025397a44d
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 26ded2f..07f7c98 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -31,15 +31,17 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
/**
@@ -82,6 +84,8 @@
private static final String SUBSCRIPTION_SUBTREE_FILTER_OPEN =
"<filter xmlns:base10=\"urn:ietf:params:xml:ns:netconf:base:1.0\" base10:type=\"subtree\">";
+ private static Pattern msgIdPattern = Pattern.compile("(message-id=\"[0-9]+\")");
+
private final AtomicInteger messageIdInteger = new AtomicInteger(0);
private Connection netconfConnection;
private NetconfDeviceInfo deviceInfo;
@@ -101,7 +105,7 @@
this.netconfConnection = null;
this.sshSession = null;
connectionActive = false;
- replies = new HashMap<>();
+ replies = new ConcurrentHashMap<>();
errorReplies = new ArrayList<>();
startConnection();
}
@@ -158,7 +162,8 @@
sshSession.startSubSystem("netconf");
streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
sshSession.getStderr(), deviceInfo,
- new NetconfSessionDelegateImpl());
+ new NetconfSessionDelegateImpl(),
+ replies);
this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
sendHello();
} catch (IOException e) {
@@ -276,22 +281,26 @@
}
@Override
+ @Deprecated
public CompletableFuture<String> request(String request) {
- CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
- replies.put(messageIdInteger.get(), ftrep);
- return ftrep;
+ return streamHandler.sendMessage(request);
+ }
+
+ private CompletableFuture<String> request(String request, int messageId) {
+ return streamHandler.sendMessage(request, messageId);
}
private String sendRequest(String request) throws NetconfException {
checkAndRestablishSession();
- request = formatRequestMessageId(request);
+ final int messageId = messageIdInteger.getAndIncrement();
+ request = formatRequestMessageId(request, messageId);
request = formatXmlHeader(request);
- CompletableFuture<String> futureReply = request(request);
- messageIdInteger.incrementAndGet();
+ CompletableFuture<String> futureReply = request(request, messageId);
int replyTimeout = NetconfControllerImpl.netconfReplyTimeout;
String rp;
try {
rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
+ replies.remove(messageId);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new NetconfException("No matching reply for request " + request, e);
}
@@ -299,15 +308,15 @@
return rp.trim();
}
- private String formatRequestMessageId(String request) {
+ private String formatRequestMessageId(String request, int messageId) {
if (request.contains(MESSAGE_ID_STRING)) {
- //FIXME if application provieds his own counting of messages this fails that count
+ //FIXME if application provides his own counting of messages this fails that count
request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
- MESSAGE_ID_STRING + EQUAL + "\"" + messageIdInteger.get() + "\"");
+ MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
} else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
//FIXME find out a better way to enforce the presence of message-id
request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
- + messageIdInteger.get() + "\"" + ">");
+ + messageId + "\"" + ">");
}
return request;
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
index 1ee7911..98ebf3f 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -29,12 +29,25 @@
/**
* Sends the request on the stream that is used to communicate to and from the device.
*
+ * If this request does not contain a messageId then this will throw a NoSuchElementException
+ *
* @param request request to send to the physical device
* @return a CompletableFuture of type String that will contain the response for the request.
+ * @deprecated - use method with messageId parameter instead
*/
+ @Deprecated
CompletableFuture<String> sendMessage(String request);
/**
+ * Sends the request on the stream that is used to communicate to and from the device.
+ *
+ * @param request request to send to the physical device
+ * @param messageId The identifier of the message - should be unique for the session
+ * @return a CompletableFuture of type String that will contain the response for the request.
+ */
+ CompletableFuture<String> sendMessage(String request, int messageId);
+
+ /**
* Adds a listener for netconf events on the handled stream.
*
* @param listener Netconf device event listener
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 184a074..4d713f6 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -32,6 +32,7 @@
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
@@ -63,26 +64,41 @@
private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
= Lists.newCopyOnWriteArrayList();
private boolean enableNotifications = true;
+ private Map<Integer, CompletableFuture<String>> replies;
public NetconfStreamThread(final InputStream in, final OutputStream out,
final InputStream err, NetconfDeviceInfo deviceInfo,
- NetconfSessionDelegate delegate) {
+ NetconfSessionDelegate delegate,
+ Map<Integer, CompletableFuture<String>> replies) {
this.in = in;
this.err = err;
outputStream = new PrintWriter(out);
netconfDeviceInfo = deviceInfo;
state = NetconfMessageState.NO_MATCHING_PATTERN;
sessionDelegate = delegate;
+ this.replies = replies;
log.debug("Stream thread for device {} session started", deviceInfo);
start();
}
@Override
public CompletableFuture<String> sendMessage(String request) {
+ Optional<Integer> messageId = getMsgId(request);
+ return sendMessage(request, messageId.get());
+ }
+
+ @Override
+ public CompletableFuture<String> sendMessage(String request, int messageId) {
log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
- outputStream.print(request);
- outputStream.flush();
- return new CompletableFuture<>();
+ CompletableFuture<String> cf = new CompletableFuture<>();
+ replies.put(messageId, cf);
+
+ synchronized (outputStream) {
+ outputStream.print(request);
+ outputStream.flush();
+ }
+
+ return cf;
}
public enum NetconfMessageState {
@@ -230,7 +246,7 @@
}
}
- private static Optional<Integer> getMsgId(String reply) {
+ protected static Optional<Integer> getMsgId(String reply) {
Matcher matcher = MSGID_PATTERN.matcher(reply);
if (matcher.find()) {
Integer messageId = Integer.parseInt(matcher.group(1));