[ONOS-3918] Handling of NETCONF <rpc-error> and no message-id

Change-Id: I8b9396a727fb54b5b84d02f258c14cfccad5bb99
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
index f6ba4ab..956f063 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
@@ -18,6 +18,8 @@
 
 import org.onosproject.event.AbstractEvent;
 
+import java.util.Optional;
+
 /**
  * Describes network configuration event.
  */
@@ -25,7 +27,7 @@
         AbstractEvent<NetconfDeviceOutputEvent.Type, Object> {
 
     private final String messagePayload;
-    private final int messageID;
+    private final Optional<Integer> messageID;
     private final NetconfDeviceInfo deviceInfo;
 
     /**
@@ -64,7 +66,8 @@
      * @param msgID             id of the message related to the event
      * @param netconfDeviceInfo device of event
      */
-    public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
+    public NetconfDeviceOutputEvent(Type type, Object subject, String payload,
+                                    Optional<Integer> msgID,
                                     NetconfDeviceInfo netconfDeviceInfo) {
         super(type, subject);
         messagePayload = payload;
@@ -83,8 +86,10 @@
      * @param msgID             id of the message related to the event
      * @param time              occurrence time
      */
-    public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
-                                    NetconfDeviceInfo netconfDeviceInfo, long time) {
+    public NetconfDeviceOutputEvent(Type type, Object subject, String payload,
+                                    Optional<Integer> msgID,
+                                    NetconfDeviceInfo netconfDeviceInfo,
+                                    long time) {
         super(type, subject, time);
         messagePayload = payload;
         deviceInfo = netconfDeviceInfo;
@@ -111,7 +116,7 @@
      * Reply messageId.
      * @return messageId
      */
-    public Integer getMessageID() {
+    public Optional<Integer> getMessageID() {
         return messageID;
     }
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 9a950fb..6634cf6 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -28,10 +28,12 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -71,12 +73,14 @@
     private String serverCapabilities;
     private NetconfStreamHandler t;
     private Map<Integer, CompletableFuture<String>> replies;
+    private List<String> errorReplies;
 
 
     public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
         this.deviceInfo = deviceInfo;
         connectionActive = false;
         replies = new HashMap<>();
+        errorReplies = new ArrayList<>();
         startConnection();
     }
 
@@ -194,18 +198,13 @@
         request = formatRequestMessageId(request);
         request = formatXmlHeader(request);
         CompletableFuture<String> futureReply = request(request);
+        messageIdInteger.incrementAndGet();
         String rp;
         try {
             rp = futureReply.get(FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            //replies.remove(messageIdInteger.get());
-            throw new NetconfException("Can't get the reply for request" + request, e);
+            throw new NetconfException("No matching reply for request " + request, e);
         }
-//        String rp = Tools.futureGetOrElse(futureReply, FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS,
-//                                          "Error in completing the request with message-id " +
-//                                                  messageIdInteger.get() +
-//                                                  ": future timed out.");
-        messageIdInteger.incrementAndGet();
         log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
         return rp;
     }
@@ -442,8 +441,16 @@
     public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
 
         @Override
-        public void notify(NetconfDeviceOutputEvent event) {
-            CompletableFuture<String> completedReply = replies.get(event.getMessageID());
+        public void notify(NetconfDeviceOutputEvent event)  {
+            Optional<Integer> messageId = event.getMessageID();
+            if (!messageId.isPresent()) {
+                errorReplies.add(event.getMessagePayload());
+                log.error("Device " + event.getDeviceInfo() +
+                          " sent error reply " + event.getMessagePayload());
+                return;
+            }
+            CompletableFuture<String> completedReply =
+                    replies.get(messageId.get());
             completedReply.complete(event.getMessagePayload());
         }
     }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 9376667..7df7849 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -32,6 +32,7 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -48,6 +49,7 @@
     private static final String RPC_REPLY = "rpc-reply";
     private static final String RPC_ERROR = "rpc-error";
     private static final String NOTIFICATION_LABEL = "<notification>";
+    private static final String MESSAGE_ID = "message-id=";
 
     private PrintWriter outputStream;
     private final InputStream err;
@@ -163,7 +165,7 @@
                         log.debug("char {} " + bufferReader.read());
                         NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
                                 NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
-                                null, null, -1, netconfDeviceInfo);
+                                null, null, Optional.of(-1), netconfDeviceInfo);
                         netconfDeviceEventListeners.forEach(
                                 listener -> listener.event(event));
                     }
@@ -175,7 +177,7 @@
                         if (deviceReply.equals(END_PATTERN)) {
                             NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
                                     NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
-                                    null, null, -1, netconfDeviceInfo);
+                                    null, null, Optional.of(-1), netconfDeviceInfo);
                             netconfDeviceEventListeners.forEach(
                                     listener -> listener.event(event));
                         } else {
@@ -211,18 +213,20 @@
             }
     }
 
-    private static int getMsgId(String reply) {
-        if (!reply.contains(HELLO)) {
-            String[] outer = reply.split("message-id=");
-            Preconditions.checkArgument(outer.length != 1,
-                                        "Error in retrieving the message id");
-            String messageID = outer[1].substring(0, 3).replace("\"", "");
-            Preconditions.checkNotNull(Integer.parseInt(messageID),
-                                       "Error in retrieving the message id");
-            return Integer.parseInt(messageID);
-        } else {
-            return 0;
+    private static Optional<Integer> getMsgId(String reply) {
+        if (reply.contains(HELLO)) {
+            return Optional.of(0);
         }
+        if (reply.contains(RPC_ERROR) && !reply.contains(MESSAGE_ID)) {
+            return Optional.empty();
+        }
+        String[] outer = reply.split(MESSAGE_ID);
+        Preconditions.checkArgument(outer.length != 1,
+                                    "Error in retrieving the message id");
+        String messageID = outer[1].substring(0, 3).replace("\"", "");
+        Preconditions.checkNotNull(Integer.parseInt(messageID),
+                                   "Error in retrieving the message id");
+        return Optional.of(Integer.parseInt(messageID));
     }
 
     public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {