[ONOS-6236] Adding retry mechanism when NETCONF device sends socket closed signal
Change-Id: Ie48e94f92ce745f3f65a352d80b7a74c2eceba04
diff --git a/drivers/fujitsu/src/test/java/org/onosproject/drivers/fujitsu/FujitsuNetconfSessionMock.java b/drivers/fujitsu/src/test/java/org/onosproject/drivers/fujitsu/FujitsuNetconfSessionMock.java
index b3e5932..4fd63f8 100644
--- a/drivers/fujitsu/src/test/java/org/onosproject/drivers/fujitsu/FujitsuNetconfSessionMock.java
+++ b/drivers/fujitsu/src/test/java/org/onosproject/drivers/fujitsu/FujitsuNetconfSessionMock.java
@@ -211,6 +211,10 @@
}
@Override
+ public void checkAndReestablish() throws NetconfException {
+ }
+
+ @Override
public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
index 66a3889..5233f08 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceOutputEvent.java
@@ -23,7 +23,7 @@
import java.util.Optional;
/**
- * Describes network configuration event.
+ * Describes a NETCONF device related event.
*/
public final class NetconfDeviceOutputEvent extends
AbstractEvent<NetconfDeviceOutputEvent.Type, Object> {
@@ -33,7 +33,7 @@
private final NetconfDeviceInfo deviceInfo;
/**
- * Type of network configuration events.
+ * Type of device related events.
*/
public enum Type {
/**
@@ -56,6 +56,13 @@
*/
DEVICE_ERROR,
+ /**
+ * Signifies that the device has closed the session.
+ * ONOS will try to reopen it, if it fails again
+ * it will mark the device as unreachable.
+ */
+ SESSION_CLOSED,
+
}
/**
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index e2d435d..31481d2 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -17,6 +17,8 @@
package org.onosproject.netconf;
import com.google.common.annotations.Beta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -327,6 +329,19 @@
void setDeviceCapabilities(List<String> capabilities);
/**
+ * Checks the state of the underlying SSH session and connection
+ * and if necessary it reestablishes it.
+ * Should be implemented, providing a default here for retrocompatibility.
+ * @throws NetconfException when there is a problem in reestablishing
+ * the connection or the session to the device.
+ */
+
+ default void checkAndReestablish() throws NetconfException {
+ Logger log = LoggerFactory.getLogger(NetconfSession.class);
+ log.error("Not implemented/exposed by the underlying session implementation");
+ }
+
+ /**
* Remove a listener from the underlying stream handler implementation.
*
* @param listener event listener.
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
index afe791c..59ed7d4 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
@@ -52,9 +52,12 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
/**
* The implementation of NetconfController.
@@ -93,6 +96,10 @@
protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
protected NetconfDeviceFactory deviceFactory = new DefaultNetconfDeviceFactory();
+ protected final ExecutorService executor =
+ Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
+ "connection-reopen-%d", log));
+
@Activate
public void activate(ComponentContext context) {
cfgService.registerProperties(getClass());
@@ -145,7 +152,7 @@
netconfReplyTimeout = newNetconfReplyTimeout;
netconfConnectTimeout = newNetconfConnectTimeout;
log.info("Settings: {} = {}, {} = {}",
- PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout);
+ PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout);
}
@Override
@@ -195,7 +202,7 @@
port = Integer.parseInt(info[2]);
} else {
ip = Arrays.asList(info).stream().filter(el -> !el.equals(info[0])
- && !el.equals(info[info.length - 1]))
+ && !el.equals(info[info.length - 1]))
.reduce((t, u) -> t + ":" + u)
.get();
log.debug("ip v6 {}", ip);
@@ -204,26 +211,26 @@
}
try {
DeviceKey deviceKey = deviceKeyService.getDeviceKey(
- DeviceKeyId.deviceKeyId(deviceId.toString()));
+ DeviceKeyId.deviceKeyId(deviceId.toString()));
NetconfDeviceInfo deviceInfo = null;
if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
- usernamepasswd.password(),
- IpAddress.valueOf(ip),
- port);
+ usernamepasswd.password(),
+ IpAddress.valueOf(ip),
+ port);
} else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
- String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+ String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
deviceInfo = new NetconfDeviceInfo(username,
- password,
- IpAddress.valueOf(ip),
- port,
- sshkey);
+ password,
+ IpAddress.valueOf(ip),
+ port,
+ sshkey);
} else {
log.error("Unknown device key for device {}", deviceId);
}
@@ -291,7 +298,6 @@
}
-
//Device factory for the specific NetconfDeviceImpl
private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory {
@@ -307,8 +313,23 @@
@Override
public void event(NetconfDeviceOutputEvent event) {
+ DeviceId did = event.getDeviceInfo().getDeviceId();
if (event.type().equals(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED)) {
- removeDevice(event.getDeviceInfo().getDeviceId());
+ removeDevice(did);
+ } else if (event.type().equals(NetconfDeviceOutputEvent.Type.SESSION_CLOSED)) {
+ log.info("Trying to reestablish connection with device {}", did);
+ executor.execute(() -> {
+ try {
+ netconfDeviceMap.get(did).getSession().checkAndReestablish();
+ log.info("Connection with device {} was reestablished", did);
+ } catch (NetconfException e) {
+ log.error("The SSH connection with device {} couldn't be " +
+ "reestablished due to {}. " +
+ "Marking the device as unreachable", e.getMessage());
+ log.debug("Complete exception: ", e);
+ removeDevice(did);
+ }
+ });
}
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
index dc6683f..63a95ba 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
@@ -48,11 +48,13 @@
break;
case DEVICE_UNREGISTERED:
log.warn("Device {} has closed session", deviceInfo);
- //TODO tell onos about closed session
break;
case DEVICE_ERROR:
log.warn("Device {} has error: {}", deviceInfo, event.getMessagePayload());
break;
+ case SESSION_CLOSED:
+ log.warn("Device {} has closed Session: {}", deviceInfo, event.getMessagePayload());
+ break;
default:
log.warn("Wrong event type {} ", event.type());
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 72ae513..07afac8 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -19,6 +19,7 @@
import com.google.common.annotations.Beta;
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
+import ch.ethz.ssh2.channel.Channel;
import com.google.common.base.Preconditions;
import org.onosproject.netconf.TargetConfig;
import org.onosproject.netconf.NetconfDeviceInfo;
@@ -44,7 +45,6 @@
import java.util.regex.Pattern;
-
/**
* Implementation of a NETCONF session to talk to a device.
*/
@@ -99,6 +99,7 @@
private Map<Integer, CompletableFuture<String>> replies;
private List<String> errorReplies;
private boolean subscriptionConnected = false;
+ private String notificationFilterSchema = null;
public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
@@ -131,7 +132,7 @@
deviceInfo.password().equals("") ? null : deviceInfo.password());
} else if (deviceInfo.getKey() != null) {
log.debug("Authenticating with key to device {} with username {}",
- deviceInfo.getDeviceId(), deviceInfo.name());
+ deviceInfo.getDeviceId(), deviceInfo.name());
isAuthenticated = netconfConnection.authenticateWithPublicKey(
deviceInfo.name(), deviceInfo.getKey(),
deviceInfo.password().equals("") ? null : deviceInfo.password());
@@ -200,6 +201,7 @@
@Override
public void startSubscription(String filterSchema) throws NetconfException {
if (!subscriptionConnected) {
+ notificationFilterSchema = filterSchema;
startSubscriptionConnection(filterSchema);
}
streamHandler.setEnableNotifications(true);
@@ -255,18 +257,23 @@
}
- private void checkAndRestablishSession() throws NetconfException {
- if (sshSession.getState() != 2) {
+ public void checkAndReestablish() throws NetconfException {
+ if (sshSession.getState() != Channel.STATE_OPEN) {
try {
- log.debug("The session with {} was reopened", deviceInfo.getDeviceId());
+ log.debug("Trying to reopen the Sesion with {}", deviceInfo.getDeviceId());
startSshSession();
- } catch (IOException e) {
- log.debug("The connection with {} was reopened", deviceInfo.getDeviceId());
+ } catch (IOException | IllegalStateException e) {
+ log.debug("Trying to reopen the Connection with {}", deviceInfo.getDeviceId());
try {
connectionActive = false;
replies.clear();
messageIdInteger.set(0);
startConnection();
+ if (subscriptionConnected) {
+ log.debug("Restarting subscription with {}", deviceInfo.getDeviceId());
+ subscriptionConnected = false;
+ startSubscription(notificationFilterSchema);
+ }
} catch (IOException e2) {
log.error("No connection {} for device {}", netconfConnection, e.getMessage());
throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
@@ -296,7 +303,7 @@
}
private String sendRequest(String request) throws NetconfException {
- checkAndRestablishSession();
+ checkAndReestablish();
final int messageId = messageIdInteger.getAndIncrement();
request = formatRequestMessageId(request, messageId);
request = formatXmlHeader(request);
@@ -639,7 +646,7 @@
public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
@Override
- public void notify(NetconfDeviceOutputEvent event) {
+ public void notify(NetconfDeviceOutputEvent event) {
Optional<Integer> messageId = event.getMessageID();
log.debug("messageID {}, waiting replies messageIDs {}", messageId,
replies.keySet());
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 81c6f6e..e78f9fa 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -185,7 +185,7 @@
log.debug("Netconf device {} sent error char in session," +
" will need to be reopend", netconfDeviceInfo);
NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
- NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
+ NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
null, null, Optional.of(-1), netconfDeviceInfo);
netconfDeviceEventListeners.forEach(
listener -> listener.event(event));
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 55d6f0c..215f104 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -53,6 +53,7 @@
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.device.PortStatisticsDiscovery;
import org.onosproject.net.key.DeviceKey;
import org.onosproject.net.key.DeviceKeyAdminService;
@@ -70,6 +71,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Dictionary;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -116,7 +118,6 @@
protected ComponentConfigService componentConfigService;
-
protected static final String APP_NAME = "org.onosproject.netconf";
private static final String SCHEME_NAME = "netconf";
private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.netconf.provider.device";
@@ -427,8 +428,11 @@
private void updatePortStatistics(Device device) {
if (device.is(PortStatisticsDiscovery.class)) {
PortStatisticsDiscovery d = device.as(PortStatisticsDiscovery.class);
- providerService.updatePortStatistics(device.id(),
- d.discoverPortStatistics());
+ Collection<PortStatistics> portStatistics = d.discoverPortStatistics();
+ if (portStatistics != null) {
+ providerService.updatePortStatistics(device.id(),
+ portStatistics);
+ }
} else {
log.warn("No port statistics getter behaviour for device {}",
device.id());