ONOS-3839 Fixing errors in sending requests and hanging on future.join
Change-Id: I6da5bf1ff728efeb0d531cf7f04f6bf49f11a0a9
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 6169c3d..9376667 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -49,17 +49,20 @@
private static final String RPC_ERROR = "rpc-error";
private static final String NOTIFICATION_LABEL = "<notification>";
- private static PrintWriter outputStream;
- private static NetconfDeviceInfo netconfDeviceInfo;
- private static NetconfSessionDelegate sessionDelegate;
- private static NetconfMessageState state;
- private static List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
+ private PrintWriter outputStream;
+ private final InputStream err;
+ private final InputStream in;
+ private NetconfDeviceInfo netconfDeviceInfo;
+ private NetconfSessionDelegate sessionDelegate;
+ private NetconfMessageState state;
+ private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
= Lists.newArrayList();
public NetconfStreamThread(final InputStream in, final OutputStream out,
final InputStream err, NetconfDeviceInfo deviceInfo,
NetconfSessionDelegate delegate) {
- super(handler(in, err));
+ this.in = in;
+ this.err = err;
outputStream = new PrintWriter(out);
netconfDeviceInfo = deviceInfo;
state = NetconfMessageState.NO_MATCHING_PATTERN;
@@ -70,6 +73,7 @@
@Override
public CompletableFuture<String> sendMessage(String request) {
+ log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
outputStream.print(request);
outputStream.flush();
return new CompletableFuture<>();
@@ -147,9 +151,8 @@
abstract NetconfMessageState evaluateChar(char c);
}
- private static Runnable handler(final InputStream in, final InputStream err) {
+ public void run() {
BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
- return () -> {
try {
boolean socketClosed = false;
StringBuilder deviceReplyBuilder = new StringBuilder();
@@ -157,6 +160,7 @@
int cInt = bufferReader.read();
if (cInt == -1) {
socketClosed = true;
+ log.debug("char {} " + bufferReader.read());
NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
null, null, -1, netconfDeviceInfo);
@@ -167,27 +171,36 @@
state = state.evaluateChar(c);
deviceReplyBuilder.append(c);
if (state == NetconfMessageState.END_PATTERN) {
- String deviceReply = deviceReplyBuilder.toString()
- .replace(END_PATTERN, "");
- if (deviceReply.contains(RPC_REPLY) ||
- deviceReply.contains(RPC_ERROR) ||
- deviceReply.contains(HELLO)) {
+ String deviceReply = deviceReplyBuilder.toString();
+ if (deviceReply.equals(END_PATTERN)) {
NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
- NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
- null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
- sessionDelegate.notify(event);
+ NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
+ null, null, -1, netconfDeviceInfo);
netconfDeviceEventListeners.forEach(
listener -> listener.event(event));
- } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
- final String finalDeviceReply = deviceReply;
- netconfDeviceEventListeners.forEach(
- listener -> listener.event(new NetconfDeviceOutputEvent(
- NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
- null, finalDeviceReply, getMsgId(finalDeviceReply), netconfDeviceInfo)));
} else {
- log.info("Error on replay from device {} ", deviceReply);
+ deviceReply = deviceReply.replace(END_PATTERN, "");
+ if (deviceReply.contains(RPC_REPLY) ||
+ deviceReply.contains(RPC_ERROR) ||
+ deviceReply.contains(HELLO)) {
+ NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+ NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
+ null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
+ sessionDelegate.notify(event);
+ netconfDeviceEventListeners.forEach(
+ listener -> listener.event(event));
+ } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
+ final String finalDeviceReply = deviceReply;
+ netconfDeviceEventListeners.forEach(
+ listener -> listener.event(new NetconfDeviceOutputEvent(
+ NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
+ null, finalDeviceReply, getMsgId(finalDeviceReply),
+ netconfDeviceInfo)));
+ } else {
+ log.info("Error on replay from device {} ", deviceReply);
+ }
+ deviceReplyBuilder.setLength(0);
}
- deviceReplyBuilder.setLength(0);
}
}
} catch (IOException e) {
@@ -196,7 +209,6 @@
netconfDeviceInfo, e));
//TODO should we send a socket closed message to listeners ?
}
- };
}
private static int getMsgId(String reply) {