netconf chunked framing v2
Change-Id: I93fad5c44315960ca6aebe5b0944947ac8bf6a51
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 7ce42b4..0e2e3ea 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -19,6 +19,7 @@
import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ClientChannel;
@@ -38,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
@@ -47,14 +49,14 @@
import java.security.PublicKey;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.X509EncodedKeySpec;
-import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
+import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -114,12 +116,17 @@
private static final Pattern SESSION_ID_REGEX_PATTERN = Pattern.compile(SESSION_ID_REGEX);
private static final String RSA = "RSA";
private static final String DSA = "DSA";
+ private static final String HASH = "#";
+ private static final String LF = "\n";
+ private static final String MSGLEN_REGEX_PATTERN = "\n#\\d+\n";
+ private static final String NETCONF_10_CAPABILITY = "urn:ietf:params:netconf:base:1.0";
+ private static final String NETCONF_11_CAPABILITY = "urn:ietf:params:netconf:base:1.1";
private String sessionID;
private final AtomicInteger messageIdInteger = new AtomicInteger(1);
protected final NetconfDeviceInfo deviceInfo;
private Iterable<String> onosCapabilities =
- Collections.singletonList("urn:ietf:params:netconf:base:1.0");
+ ImmutableList.of(NETCONF_10_CAPABILITY, NETCONF_11_CAPABILITY);
/* NOTE: the "serverHelloResponseOld" is deprecated in 1.10.0 and should eventually be removed */
@Deprecated
@@ -149,6 +156,14 @@
startConnection();
}
+ public NetconfSessionMinaImpl(NetconfDeviceInfo deviceInfo, List<String> capabilities) throws NetconfException {
+ this.deviceInfo = deviceInfo;
+ replies = new ConcurrentHashMap<>();
+ errorReplies = new ArrayList<>();
+ setOnosCapabilities(capabilities);
+ startConnection();
+ }
+
private void startConnection() throws NetconfException {
try {
startClient();
@@ -172,8 +187,8 @@
private void startSession() throws IOException {
final ConnectFuture connectFuture;
connectFuture = client.connect(deviceInfo.name(),
- deviceInfo.ip().toString(),
- deviceInfo.port())
+ deviceInfo.ip().toString(),
+ deviceInfo.port())
.verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS);
session = connectFuture.getSession();
//Using the device ssh key if possible
@@ -189,8 +204,8 @@
key = getPublicKey(byteKey, DSA);
} catch (NoSuchAlgorithmException | InvalidKeySpecException e1) {
throw new NetconfException("Failed to authenticate session with device " +
- deviceInfo + "check key to be the " +
- "proper DSA or RSA key", e1);
+ deviceInfo + "check key to be the " +
+ "proper DSA or RSA key", e1);
}
}
//privateKye can set tu null because is not used by the method.
@@ -201,13 +216,13 @@
session.auth().verify(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS);
Set<ClientSession.ClientSessionEvent> event = session.waitFor(
ImmutableSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH,
- ClientSession.ClientSessionEvent.CLOSED,
- ClientSession.ClientSessionEvent.AUTHED), 0);
+ ClientSession.ClientSessionEvent.CLOSED,
+ ClientSession.ClientSessionEvent.AUTHED), 0);
if (!event.contains(ClientSession.ClientSessionEvent.AUTHED)) {
log.debug("Session closed {} {}", event, session.isClosed());
throw new NetconfException("Failed to authenticate session with device " +
- deviceInfo + "check the user/pwd or key");
+ deviceInfo + "check the user/pwd or key");
}
openChannel();
}
@@ -227,11 +242,11 @@
if (channelFuture.await(NetconfControllerImpl.netconfConnectTimeout, TimeUnit.SECONDS)) {
if (channelFuture.isOpened()) {
streamHandler = new NetconfStreamThread(channel.getInvertedOut(), channel.getInvertedIn(),
- channel.getInvertedErr(), deviceInfo,
- new NetconfSessionDelegateImpl(), replies);
+ channel.getInvertedErr(), deviceInfo,
+ new NetconfSessionDelegateImpl(), replies);
} else {
throw new NetconfException("Failed to open channel with device " +
- deviceInfo);
+ deviceInfo);
}
sendHello();
}
@@ -251,13 +266,13 @@
// interleave supported and existing filter is NOT "no filtering"
// and was requested with different filtering schema
log.info("Cannot use existing session for subscription {} ({})",
- deviceInfo, filterSchema);
+ deviceInfo, filterSchema);
openNewSession = true;
}
if (openNewSession) {
log.info("Creating notification session to {} with filter {}",
- deviceInfo, filterSchema);
+ deviceInfo, filterSchema);
NetconfSession child = new NotificationSession(deviceInfo);
child.addDeviceOutputListener(new NotificationForwarder());
@@ -271,7 +286,7 @@
String reply = sendRequest(createSubscriptionString(filterSchema));
if (!checkReply(reply)) {
throw new NetconfException("Subscription not successful with device "
- + deviceInfo + " with reply " + reply);
+ + deviceInfo + " with reply " + reply);
}
subscriptionConnected = true;
}
@@ -335,7 +350,7 @@
sessionID = sessionIDMatcher.group(1);
} else {
throw new NetconfException("Missing SessionID in server hello " +
- "reponse.");
+ "reponse.");
}
}
@@ -391,14 +406,50 @@
@Override
public String requestSync(String request) throws NetconfException {
- if (!request.contains(ENDPATTERN)) {
- request = request + NEW_LINE + ENDPATTERN;
- }
String reply = sendRequest(request);
checkReply(reply);
return reply;
}
+
+ /**
+ * Validate and format netconf message.
+ *
+ * @param message to format
+ * @return formated message
+ */
+ private String formatNetconfMessage(String message) {
+ if (deviceCapabilities.contains(NETCONF_11_CAPABILITY)) {
+ message = formatChunkedMessage(message);
+ } else {
+ if (!message.contains(ENDPATTERN)) {
+ message = message + NEW_LINE + ENDPATTERN;
+ }
+ }
+ return message;
+ }
+
+ /**
+ * Validate and format message according to chunked framing mechanism.
+ *
+ * @param message to format
+ * @return formated message
+ */
+ private String formatChunkedMessage(String message) {
+ if (message.endsWith(ENDPATTERN)) {
+ message = message.substring(0, message.length() - ENDPATTERN.length());
+ }
+ if (!message.startsWith(LF + HASH)) {
+ try {
+ message = LF + HASH + message.getBytes("UTF-8").length + LF + message + LF + HASH + HASH + LF;
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ return message;
+ }
+
+
@Override
@Deprecated
public CompletableFuture<String> request(String request) {
@@ -410,6 +461,7 @@
}
private String sendRequest(String request) throws NetconfException {
+ request = formatNetconfMessage(request);
return sendRequest(request, false);
}
@@ -419,8 +471,8 @@
if (!isHello) {
messageId = messageIdInteger.getAndIncrement();
}
- request = formatRequestMessageId(request, messageId);
request = formatXmlHeader(request);
+ request = formatRequestMessageId(request, messageId);
CompletableFuture<String> futureReply = request(request, messageId);
int replyTimeout = NetconfControllerImpl.netconfReplyTimeout;
String rp;
@@ -460,19 +512,42 @@
if (request.contains(MESSAGE_ID_STRING)) {
//FIXME if application provides his own counting of messages this fails that count
request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
- MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
+ MESSAGE_ID_STRING + EQUAL + "\"" + messageId + "\"");
} else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
//FIXME find out a better way to enforce the presence of message-id
request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
+ messageId + "\"" + ">");
}
+ request = updateRequestLenght(request);
+ return request;
+ }
+
+ private String updateRequestLenght(String request) {
+ if (request.contains(LF + HASH + HASH + LF)) {
+ int oldLen = Integer.parseInt(request.split(HASH)[1].split(LF)[0]);
+ String rpcWithEnding = request.substring(request.indexOf('<'));
+ String firstBlock = request.split(MSGLEN_REGEX_PATTERN)[1].split(LF + HASH + HASH + LF)[0];
+ int newLen = 0;
+ try {
+ newLen = firstBlock.getBytes("UTF-8").length;
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ if (oldLen != newLen) {
+ return LF + HASH + newLen + LF + rpcWithEnding;
+ }
+ }
return request;
}
private String formatXmlHeader(String request) {
if (!request.contains(XML_HEADER)) {
//FIXME if application provieds his own XML header of different type there is a clash
- request = XML_HEADER + "\n" + request;
+ if (request.startsWith(LF + HASH)) {
+ request = request.split("<")[0] + XML_HEADER + request.substring(request.split("<")[0].length());
+ } else {
+ request = XML_HEADER + "\n" + request;
+ }
}
return request;
}
@@ -562,7 +637,9 @@
@Override
public boolean editConfig(String newConfiguration) throws NetconfException {
- newConfiguration = newConfiguration + ENDPATTERN;
+ if (!newConfiguration.endsWith(ENDPATTERN)) {
+ newConfiguration = newConfiguration + ENDPATTERN;
+ }
return checkReply(sendRequest(newConfiguration));
}
@@ -613,14 +690,14 @@
String newConfiguration)
throws NetconfException {
return bareCopyConfig(netconfTargetConfig.asXml(),
- normalizeCopyConfigParam(newConfiguration));
+ normalizeCopyConfigParam(newConfiguration));
}
@Override
public boolean copyConfig(String netconfTargetConfig,
String newConfiguration) throws NetconfException {
return bareCopyConfig(normalizeCopyConfigParam(netconfTargetConfig),
- normalizeCopyConfigParam(newConfiguration));
+ normalizeCopyConfigParam(newConfiguration));
}
/**
@@ -668,7 +745,7 @@
public boolean deleteConfig(DatastoreId netconfTargetConfig) throws NetconfException {
if (netconfTargetConfig.equals(DatastoreId.RUNNING)) {
log.warn("Target configuration for delete operation can't be \"running\"",
- netconfTargetConfig);
+ netconfTargetConfig);
return false;
}
StringBuilder rpc = new StringBuilder(XML_HEADER);
@@ -852,11 +929,11 @@
public void notify(NetconfDeviceOutputEvent event) {
Optional<Integer> messageId = event.getMessageID();
log.debug("messageID {}, waiting replies messageIDs {}", messageId,
- replies.keySet());
+ replies.keySet());
if (!messageId.isPresent()) {
errorReplies.add(event.getMessagePayload());
log.error("Device {} sent error reply {}",
- event.getDeviceInfo(), event.getMessagePayload());
+ event.getDeviceInfo(), event.getMessagePayload());
return;
}
CompletableFuture<String> completedReply =
@@ -874,4 +951,4 @@
return new NetconfSessionMinaImpl(netconfDeviceInfo);
}
}
-}
+}
\ No newline at end of file