Bug fix for NETCONF controller - disconnect
Change-Id: I1f01f5b7e21e2e9c14358b4686077896ae4975e8
diff --git a/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java b/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java
index 4df22ec..233e537 100644
--- a/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java
+++ b/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java
@@ -185,6 +185,7 @@
} else {
log.error("Unexpected error on SA Filt getFlowEntries on {}",
handler().data().deviceId(), e);
+ return flowEntryCollection;
}
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index 428c9d2..2785fa3 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -80,10 +80,16 @@
label = "Time (in seconds) waiting for a NetConf reply")
protected static int netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS;
+ private static final String PROP_NETCONF_IDLE_TIMEOUT = "netconfIdleTimeout";
+ private static final int DEFAULT_IDLE_TIMEOUT_SECONDS = 300;
+ @Property(name = PROP_NETCONF_IDLE_TIMEOUT, intValue = DEFAULT_IDLE_TIMEOUT_SECONDS,
+ label = "Time (in seconds) SSH session will close if no traffic seen")
+ protected static int netconfIdleTimeout = DEFAULT_IDLE_TIMEOUT_SECONDS;
+
private static final String SSH_LIBRARY = "sshLibrary";
private static final String APACHE_MINA = "apache_mina";
@Property(name = SSH_LIBRARY, value = APACHE_MINA,
- label = "Ssh Llbrary instead of Apache Mina (i.e. ethz-ssh2")
+ label = "Ssh Library instead of apache_mina (i.e. ethz-ssh2")
protected static String sshLibrary = APACHE_MINA;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -142,6 +148,7 @@
int newNetconfReplyTimeout;
int newNetconfConnectTimeout;
+ int newNetconfIdleTimeout;
String newSshLibrary;
try {
String s = get(properties, PROP_NETCONF_REPLY_TIMEOUT);
@@ -152,6 +159,10 @@
newNetconfConnectTimeout = isNullOrEmpty(s) ?
netconfConnectTimeout : Integer.parseInt(s.trim());
+ s = get(properties, PROP_NETCONF_IDLE_TIMEOUT);
+ newNetconfIdleTimeout = isNullOrEmpty(s) ?
+ netconfIdleTimeout : Integer.parseInt(s.trim());
+
newSshLibrary = get(properties, SSH_LIBRARY);
} catch (NumberFormatException e) {
@@ -165,14 +176,19 @@
} else if (newNetconfReplyTimeout <= 0) {
log.warn("netconfReplyTimeout is invalid - 0 or less.");
return;
+ } else if (newNetconfIdleTimeout <= 0) {
+ log.warn("netconfIdleTimeout is invalid - 0 or less.");
+ return;
}
netconfReplyTimeout = newNetconfReplyTimeout;
netconfConnectTimeout = newNetconfConnectTimeout;
+ netconfIdleTimeout = newNetconfIdleTimeout;
sshLibrary = newSshLibrary;
log.info("Settings: {} = {}, {} = {}, {} = {}",
PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout,
PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout,
+ PROP_NETCONF_IDLE_TIMEOUT, netconfIdleTimeout,
SSH_LIBRARY, sshLibrary);
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
index 00fe4dc..8303868 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
@@ -385,8 +385,27 @@
try {
rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
replies.remove(messageId);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- throw new NetconfException("No matching reply for request " + request, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new NetconfException("Interrupted waiting for reply for request" + request, e);
+ } catch (TimeoutException e) {
+ throw new NetconfException("Timed out waiting for reply for request " + request, e);
+ } catch (ExecutionException e) {
+ log.warn("Closing session {} for {} due to unexpected Error", sessionID, deviceInfo, e);
+
+ netconfConnection.close(); //Closes the socket which should interrupt NetconfStreamThread
+ sshSession.close();
+
+ NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+ NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
+ null, "Closed due to unexpected error " + e.getCause(),
+ Optional.of(-1), deviceInfo);
+ publishEvent(event);
+ replies.clear();
+ errorReplies.clear();
+
+ throw new NetconfException("Closing session " + sessionID + " for " + deviceInfo +
+ " for request " + request, e);
}
log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
return rp.trim();
@@ -724,6 +743,14 @@
return false;
}
+ protected void publishEvent(NetconfDeviceOutputEvent event) {
+ primaryListeners.forEach(lsnr -> {
+ if (lsnr.isRelevant(event)) {
+ lsnr.event(event);
+ }
+ });
+ }
+
static class NotificationSession extends NetconfSessionImpl {
private String notificationFilter;
@@ -767,11 +794,7 @@
@Override
public void event(NetconfDeviceOutputEvent event) {
- primaryListeners.forEach(lsnr -> {
- if (lsnr.isRelevant(event)) {
- lsnr.event(event);
- }
- });
+ publishEvent(event);
}
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 94a14e5..6938661 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -25,6 +25,7 @@
import org.apache.sshd.client.future.ConnectFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.onosproject.netconf.DatastoreId;
import org.onosproject.netconf.NetconfDeviceInfo;
@@ -126,7 +127,7 @@
private final Set<String> deviceCapabilities = new LinkedHashSet<>();
private NetconfStreamHandler streamHandler;
private Map<Integer, CompletableFuture<String>> replies;
- private List<String> errorReplies;
+ private List<String> errorReplies; // Not sure why we need this?
private boolean subscriptionConnected = false;
private String notificationFilterSchema = null;
@@ -158,6 +159,11 @@
private void startClient() throws IOException {
client = SshClient.setUpDefaultClient();
+ int replyTimeoutSec = NetconfControllerImpl.netconfIdleTimeout;
+ client.getProperties().putIfAbsent(FactoryManager.IDLE_TIMEOUT,
+ TimeUnit.SECONDS.toMillis(replyTimeoutSec));
+ client.getProperties().putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT,
+ TimeUnit.SECONDS.toMillis(replyTimeoutSec + 15L));
client.start();
client.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
startSession();
@@ -372,7 +378,7 @@
subscriptionConnected = false;
startSubscription(notificationFilterSchema);
}
- } catch (IOException e) {
+ } catch (IOException | IllegalStateException e) {
log.error("Can't reopen connection for device {}", e.getMessage());
throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
}
@@ -420,9 +426,31 @@
String rp;
try {
rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
- replies.remove(messageId);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- throw new NetconfException("No matching reply for request " + request, e);
+ replies.remove(messageId); // Why here???
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new NetconfException("Interrupted waiting for reply for request" + request, e);
+ } catch (TimeoutException e) {
+ throw new NetconfException("Timed out waiting for reply for request " + request, e);
+ } catch (ExecutionException e) {
+ log.warn("Closing session {} for {} due to unexpected Error", sessionID, deviceInfo, e);
+ try {
+ session.close();
+ channel.close(); //Closes the socket which should interrupt NetconfStreamThread
+ client.close();
+ } catch (IOException ioe) {
+ log.warn("Error closing session {} on {}", sessionID, deviceInfo, ioe);
+ }
+ NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+ NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
+ null, "Closed due to unexpected error " + e.getCause(),
+ Optional.of(-1), deviceInfo);
+ publishEvent(event);
+ errorReplies.clear(); // move to cleanUp()?
+ cleanUp();
+
+ throw new NetconfException("Closing session " + sessionID + " for " + deviceInfo +
+ " for request " + request, e);
}
log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
return rp.trim();
@@ -763,6 +791,14 @@
return false;
}
+ protected void publishEvent(NetconfDeviceOutputEvent event) {
+ primaryListeners.forEach(lsnr -> {
+ if (lsnr.isRelevant(event)) {
+ lsnr.event(event);
+ }
+ });
+ }
+
static class NotificationSession extends NetconfSessionMinaImpl {
private String notificationFilter;
@@ -806,11 +842,7 @@
@Override
public void event(NetconfDeviceOutputEvent event) {
- primaryListeners.forEach(lsnr -> {
- if (lsnr.isRelevant(event)) {
- lsnr.event(event);
- }
- });
+ publishEvent(event);
}
}
@@ -828,7 +860,7 @@
return;
}
CompletableFuture<String> completedReply =
- replies.get(messageId.get());
+ replies.get(messageId.get()); // remove(..)?
if (completedReply != null) {
completedReply.complete(event.getMessagePayload());
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
index 44206e3..6b231b3 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
@@ -16,7 +16,6 @@
package org.onosproject.netconf.ctl.impl;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.onosproject.netconf.NetconfDeviceInfo;
import org.onosproject.netconf.NetconfDeviceOutputEvent;
@@ -30,7 +29,8 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.io.PrintWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -55,7 +55,7 @@
private static final String MESSAGE_ID = "message-id=";
private static final Pattern MSGID_PATTERN = Pattern.compile(MESSAGE_ID + "\"(\\d+)\"");
- private PrintWriter outputStream;
+ private OutputStreamWriter outputStream;
private final InputStream err;
private final InputStream in;
private NetconfDeviceInfo netconfDeviceInfo;
@@ -72,7 +72,7 @@
Map<Integer, CompletableFuture<String>> replies) {
this.in = in;
this.err = err;
- outputStream = new PrintWriter(out);
+ outputStream = new OutputStreamWriter(out, StandardCharsets.UTF_8);
netconfDeviceInfo = deviceInfo;
state = NetconfMessageState.NO_MATCHING_PATTERN;
sessionDelegate = delegate;
@@ -94,8 +94,13 @@
replies.put(messageId, cf);
synchronized (outputStream) {
- outputStream.print(request);
- outputStream.flush();
+ try {
+ outputStream.write(request);
+ outputStream.flush();
+ } catch (IOException e) {
+ log.error("Writing to {} failed", netconfDeviceInfo, e);
+ cf.completeExceptionally(e);
+ }
}
return cf;
@@ -251,9 +256,11 @@
protected static Optional<Integer> getMsgId(String reply) {
Matcher matcher = MSGID_PATTERN.matcher(reply);
if (matcher.find()) {
- Integer messageId = Integer.parseInt(matcher.group(1));
- Preconditions.checkNotNull(messageId, "Error in retrieving the message id");
- return Optional.of(messageId);
+ try {
+ return Optional.of(Integer.valueOf(matcher.group(1)));
+ } catch (NumberFormatException e) {
+ log.warn("Failed to parse message-id from {}", matcher.group(), e);
+ }
}
if (reply.contains(HELLO)) {
return Optional.of(-1);