Fixed some concurrency issues in NETCONF Session Added Unit Tests
Change-Id: I84fe0c17e3d757948a859f78d01fbb025397a44d
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 184a074..4d713f6 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -32,6 +32,7 @@
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
@@ -63,26 +64,41 @@
private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
= Lists.newCopyOnWriteArrayList();
private boolean enableNotifications = true;
+ private Map<Integer, CompletableFuture<String>> replies;
public NetconfStreamThread(final InputStream in, final OutputStream out,
final InputStream err, NetconfDeviceInfo deviceInfo,
- NetconfSessionDelegate delegate) {
+ NetconfSessionDelegate delegate,
+ Map<Integer, CompletableFuture<String>> replies) {
this.in = in;
this.err = err;
outputStream = new PrintWriter(out);
netconfDeviceInfo = deviceInfo;
state = NetconfMessageState.NO_MATCHING_PATTERN;
sessionDelegate = delegate;
+ this.replies = replies;
log.debug("Stream thread for device {} session started", deviceInfo);
start();
}
@Override
public CompletableFuture<String> sendMessage(String request) {
+ Optional<Integer> messageId = getMsgId(request);
+ return sendMessage(request, messageId.get());
+ }
+
+ @Override
+ public CompletableFuture<String> sendMessage(String request, int messageId) {
log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
- outputStream.print(request);
- outputStream.flush();
- return new CompletableFuture<>();
+ CompletableFuture<String> cf = new CompletableFuture<>();
+ replies.put(messageId, cf);
+
+ synchronized (outputStream) {
+ outputStream.print(request);
+ outputStream.flush();
+ }
+
+ return cf;
}
public enum NetconfMessageState {
@@ -230,7 +246,7 @@
}
}
- private static Optional<Integer> getMsgId(String reply) {
+ protected static Optional<Integer> getMsgId(String reply) {
Matcher matcher = MSGID_PATTERN.matcher(reply);
if (matcher.find()) {
Integer messageId = Integer.parseInt(matcher.group(1));