netconf chunked framing v2

Change-Id: I93fad5c44315960ca6aebe5b0944947ac8bf6a51
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 7ce42b4..0e2e3ea 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -19,6 +19,7 @@
 import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.channel.ClientChannel;
@@ -38,6 +39,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.StandardCharsets;
@@ -47,14 +49,14 @@
 import java.security.PublicKey;
 import java.security.spec.InvalidKeySpecException;
 import java.security.spec.X509EncodedKeySpec;
-import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
+import java.util.ArrayList;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -114,12 +116,17 @@
     private static final Pattern SESSION_ID_REGEX_PATTERN = Pattern.compile(SESSION_ID_REGEX);
     private static final String RSA = "RSA";
     private static final String DSA = "DSA";
+    private static final String HASH = "#";
+    private static final String LF = "\n";
+    private static final String MSGLEN_REGEX_PATTERN = "\n#\\d+\n";
+    private static final String NETCONF_10_CAPABILITY = "urn:ietf:params:netconf:base:1.0";
+    private static final String NETCONF_11_CAPABILITY = "urn:ietf:params:netconf:base:1.1";
 
     private String sessionID;
     private final AtomicInteger messageIdInteger = new AtomicInteger(1);
     protected final NetconfDeviceInfo deviceInfo;
     private Iterable<String> onosCapabilities =
-            Collections.singletonList("urn:ietf:params:netconf:base:1.0");
+            ImmutableList.of(NETCONF_10_CAPABILITY, NETCONF_11_CAPABILITY);
 
     /* NOTE: the "serverHelloResponseOld" is deprecated in 1.10.0 and should eventually be removed */
     @Deprecated
@@ -149,6 +156,14 @@
         startConnection();
     }
 
+    public NetconfSessionMinaImpl(NetconfDeviceInfo deviceInfo, List<String> capabilities) throws NetconfException {
+        this.deviceInfo = deviceInfo;
+        replies = new ConcurrentHashMap<>();
+        errorReplies = new ArrayList<>();
+        setOnosCapabilities(capabilities);
+        startConnection();
+    }
+
     private void startConnection() throws NetconfException {
         try {
             startClient();
@@ -172,8 +187,8 @@
     private void startSession() throws IOException {
         final ConnectFuture connectFuture;
         connectFuture = client.connect(deviceInfo.name(),
-                                       deviceInfo.ip().toString(),
-                                       deviceInfo.port())
+                deviceInfo.ip().toString(),
+                deviceInfo.port())
                 .verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS);
         session = connectFuture.getSession();
         //Using the device ssh key if possible
@@ -189,8 +204,8 @@
                     key = getPublicKey(byteKey, DSA);
                 } catch (NoSuchAlgorithmException | InvalidKeySpecException e1) {
                     throw new NetconfException("Failed to authenticate session with device " +
-                                                       deviceInfo + "check key to be the " +
-                                                       "proper DSA or RSA key", e1);
+                            deviceInfo + "check key to be the " +
+                            "proper DSA or RSA key", e1);
                 }
             }
             //privateKye can set tu null because is not used by the method.
@@ -201,13 +216,13 @@
         session.auth().verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS);
         Set<ClientSession.ClientSessionEvent> event = session.waitFor(
                 ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
-                                ClientSession.ClientSessionEvent.CLOSED,
-                                ClientSession.ClientSessionEvent.AUTHED), 0);
+                        ClientSession.ClientSessionEvent.CLOSED,
+                        ClientSession.ClientSessionEvent.AUTHED), 0);
 
         if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
             log.debug("Session closed {} {}", event, session.isClosed());
             throw new NetconfException("Failed to authenticate session with device " +
-                                               deviceInfo + "check the user/pwd or key");
+                    deviceInfo + "check the user/pwd or key");
         }
         openChannel();
     }
@@ -227,11 +242,11 @@
         if (channelFuture.await(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS)) {
             if (channelFuture.isOpened()) {
                 streamHandler = new NetconfStreamThread(channel.getInvertedOut(), channel.getInvertedIn(),
-                                                        channel.getInvertedErr(), deviceInfo,
-                                                        new NetconfSessionDelegateImpl(), replies);
+                        channel.getInvertedErr(), deviceInfo,
+                        new NetconfSessionDelegateImpl(), replies);
             } else {
                 throw new NetconfException("Failed to open channel with device " +
-                                                   deviceInfo);
+                        deviceInfo);
             }
             sendHello();
         }
@@ -251,13 +266,13 @@
             // interleave supported and existing filter is NOT "no filtering"
             // and was requested with different filtering schema
             log.info("Cannot use existing session for subscription {} ({})",
-                     deviceInfo, filterSchema);
+                    deviceInfo, filterSchema);
             openNewSession = true;
         }
 
         if (openNewSession) {
             log.info("Creating notification session to {} with filter {}",
-                     deviceInfo, filterSchema);
+                    deviceInfo, filterSchema);
             NetconfSession child = new NotificationSession(deviceInfo);
 
             child.addDeviceOutputListener(new NotificationForwarder());
@@ -271,7 +286,7 @@
         String reply = sendRequest(createSubscriptionString(filterSchema));
         if (!checkReply(reply)) {
             throw new NetconfException("Subscription not successful with device "
-                                               + deviceInfo + " with reply " + reply);
+                    + deviceInfo + " with reply " + reply);
         }
         subscriptionConnected = true;
     }
@@ -335,7 +350,7 @@
             sessionID = sessionIDMatcher.group(1);
         } else {
             throw new NetconfException("Missing SessionID in server hello " +
-                                               "reponse.");
+                    "reponse.");
         }
 
     }
@@ -391,14 +406,50 @@
 
     @Override
     public String requestSync(String request) throws NetconfException {
-        if (!request.contains(ENDPATTERN)) {
-            request = request + NEW_LINE + ENDPATTERN;
-        }
         String reply = sendRequest(request);
         checkReply(reply);
         return reply;
     }
 
+
+    /**
+     * Validate and format netconf message.
+     *
+     * @param message to format
+     * @return formated message
+     */
+    private String formatNetconfMessage(String message) {
+        if (deviceCapabilities.contains(NETCONF_11_CAPABILITY)) {
+            message = formatChunkedMessage(message);
+        } else {
+            if (!message.contains(ENDPATTERN)) {
+                message = message + NEW_LINE + ENDPATTERN;
+            }
+        }
+        return  message;
+    }
+
+    /**
+     * Validate and format message according to chunked framing mechanism.
+     *
+     * @param message to format
+     * @return formated message
+     */
+    private String formatChunkedMessage(String message) {
+        if (message.endsWith(ENDPATTERN)) {
+            message = message.substring(0, message.length() - ENDPATTERN.length());
+        }
+        if (!message.startsWith(LF + HASH)) {
+            try {
+                message = LF + HASH + message.getBytes("UTF-8").length + LF + message + LF + HASH + HASH + LF;
+            } catch (UnsupportedEncodingException e) {
+                e.printStackTrace();
+            }
+        }
+        return message;
+    }
+
+
     @Override
     @Deprecated
     public CompletableFuture<String> request(String request) {
@@ -410,6 +461,7 @@
     }
 
     private String sendRequest(String request) throws NetconfException {
+        request = formatNetconfMessage(request);
         return sendRequest(request, false);
     }
 
@@ -419,8 +471,8 @@
         if (!isHello) {
             messageId = messageIdInteger.getAndIncrement();
         }
-        request = formatRequestMessageId(request, messageId);
         request = formatXmlHeader(request);
+        request = formatRequestMessageId(request, messageId);
         CompletableFuture<String> futureReply = request(request, messageId);
         int replyTimeout = NetconfControllerImpl.netconfReplyTimeout;
         String rp;
@@ -460,19 +512,42 @@
         if (request.contains(MESSAGE_ID_STRING)) {
             //FIXME if application provides his own counting of messages this fails that count
             request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
-                                           MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
+                    MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
         } else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
             //FIXME find out a better way to enforce the presence of message-id
             request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
                     + messageId + "\"" + ">");
         }
+        request = updateRequestLenght(request);
+        return request;
+    }
+
+    private String updateRequestLenght(String request) {
+        if (request.contains(LF + HASH + HASH + LF)) {
+            int oldLen = Integer.parseInt(request.split(HASH)[1].split(LF)[0]);
+            String rpcWithEnding = request.substring(request.indexOf('<'));
+            String firstBlock = request.split(MSGLEN_REGEX_PATTERN)[1].split(LF + HASH + HASH + LF)[0];
+            int newLen = 0;
+            try {
+                newLen = firstBlock.getBytes("UTF-8").length;
+            } catch (UnsupportedEncodingException e) {
+                e.printStackTrace();
+            }
+            if (oldLen != newLen) {
+                return LF + HASH + newLen + LF + rpcWithEnding;
+            }
+        }
         return request;
     }
 
     private String formatXmlHeader(String request) {
         if (!request.contains(XML_HEADER)) {
             //FIXME if application provieds his own XML header of different type there is a clash
-            request = XML_HEADER + "\n" + request;
+            if (request.startsWith(LF + HASH)) {
+                request = request.split("<")[0] + XML_HEADER + request.substring(request.split("<")[0].length());
+            } else {
+                request = XML_HEADER + "\n" + request;
+            }
         }
         return request;
     }
@@ -562,7 +637,9 @@
 
     @Override
     public boolean editConfig(String newConfiguration) throws NetconfException {
-        newConfiguration = newConfiguration + ENDPATTERN;
+        if (!newConfiguration.endsWith(ENDPATTERN)) {
+            newConfiguration = newConfiguration + ENDPATTERN;
+        }
         return checkReply(sendRequest(newConfiguration));
     }
 
@@ -613,14 +690,14 @@
                               String newConfiguration)
             throws NetconfException {
         return bareCopyConfig(netconfTargetConfig.asXml(),
-                              normalizeCopyConfigParam(newConfiguration));
+                normalizeCopyConfigParam(newConfiguration));
     }
 
     @Override
     public boolean copyConfig(String netconfTargetConfig,
                               String newConfiguration) throws NetconfException {
         return bareCopyConfig(normalizeCopyConfigParam(netconfTargetConfig),
-                              normalizeCopyConfigParam(newConfiguration));
+                normalizeCopyConfigParam(newConfiguration));
     }
 
     /**
@@ -668,7 +745,7 @@
     public boolean deleteConfig(DatastoreId netconfTargetConfig) throws NetconfException {
         if (netconfTargetConfig.equals(DatastoreId.RUNNING)) {
             log.warn("Target configuration for delete operation can't be \"running\"",
-                     netconfTargetConfig);
+                    netconfTargetConfig);
             return false;
         }
         StringBuilder rpc = new StringBuilder(XML_HEADER);
@@ -852,11 +929,11 @@
         public void notify(NetconfDeviceOutputEvent event) {
             Optional<Integer> messageId = event.getMessageID();
             log.debug("messageID {}, waiting replies messageIDs {}", messageId,
-                      replies.keySet());
+                    replies.keySet());
             if (!messageId.isPresent()) {
                 errorReplies.add(event.getMessagePayload());
                 log.error("Device {} sent error reply {}",
-                          event.getDeviceInfo(), event.getMessagePayload());
+                        event.getDeviceInfo(), event.getMessagePayload());
                 return;
             }
             CompletableFuture<String> completedReply =
@@ -874,4 +951,4 @@
             return new NetconfSessionMinaImpl(netconfDeviceInfo);
         }
     }
-}
+}
\ No newline at end of file