NetconfAlarmProvider alerts core about notifications given subscription.
Change-Id: I7561ba680eb8bac33a8543d6aa1bccf6732e95db
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index 0668422..daab1b5 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -141,7 +141,41 @@
boolean deleteConfig(String targetConfiguration) throws NetconfException;
/**
- * Locks the candidate configuration.
+ * Starts subscription to the device's notifications.
+ *
+ * @throws NetconfException when there is a problem starting the subscription
+ */
+ void startSubscription() throws NetconfException;
+
+ /**
+ * Ends subscription to the device's notifications.
+ *
+ * @throws NetconfException when there is a problem ending the subscription
+ */
+ void endSubscription() throws NetconfException;
+
+ /**
+ * Locks the specified configuration.
+ *
+ * @param configType type of configuration to be locked
+ * @return true if successful.
+ * @throws NetconfException when there is a problem in the communication process on
+ * the underlying connection
+ */
+ boolean lock(String configType) throws NetconfException;
+
+ /**
+ * Unlocks the specified configuration.
+ *
+ * @param configType type of configuration to be locked
+ * @return true if successful.
+ * @throws NetconfException when there is a problem in the communication process on
+ * the underlying connection
+ */
+ boolean unlock(String configType) throws NetconfException;
+
+ /**
+ * Locks the running configuration.
*
* @return true if successful.
* @throws NetconfException when there is a problem in the communication process on
@@ -150,7 +184,7 @@
boolean lock() throws NetconfException;
/**
- * Unlocks the candidate configuration.
+ * Unlocks the running configuration.
*
* @return true if successful.
* @throws NetconfException when there is a problem in the communication process on
diff --git a/protocols/netconf/ctl/BUCK b/protocols/netconf/ctl/BUCK
index 209e29d..1503f32 100644
--- a/protocols/netconf/ctl/BUCK
+++ b/protocols/netconf/ctl/BUCK
@@ -7,6 +7,7 @@
TEST_DEPS = [
'//lib:TEST_ADAPTERS',
'//utils/osgi:onlab-osgi-tests',
+ '//core/api:onos-api-tests',
]
osgi_jar_with_tests (
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 487f4cc..1ee0f08 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -81,9 +81,10 @@
private List<String> deviceCapabilities =
Collections.singletonList("urn:ietf:params:netconf:base:1.0");
private String serverCapabilities;
- private NetconfStreamHandler t;
+ private NetconfStreamHandler streamHandler;
private Map<Integer, CompletableFuture<String>> replies;
private List<String> errorReplies;
+ private boolean subscriptionConnected = false;
public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
@@ -136,9 +137,9 @@
try {
sshSession = netconfConnection.openSession();
sshSession.startSubSystem("netconf");
- t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
- sshSession.getStderr(), deviceInfo,
- new NetconfSessionDelegateImpl());
+ streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+ sshSession.getStderr(), deviceInfo,
+ new NetconfSessionDelegateImpl());
this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
sendHello();
} catch (IOException e) {
@@ -148,6 +149,45 @@
}
}
+ private void startSubscriptionConnection() throws NetconfException {
+ if (!serverCapabilities.contains("interleave")) {
+ throw new NetconfException("Device" + deviceInfo + "does not support interleave");
+ }
+ String reply = sendRequest(createSubscriptionString());
+ if (!checkReply(reply)) {
+ throw new NetconfException("Subscription not successful with device "
+ + deviceInfo + " with reply " + reply);
+ }
+ subscriptionConnected = true;
+ }
+
+ public void startSubscription() throws NetconfException {
+ if (!subscriptionConnected) {
+ startSubscriptionConnection();
+ }
+ streamHandler.setEnableNotifications(true);
+ }
+
+ private String createSubscriptionString() {
+ StringBuilder subscriptionbuffer = new StringBuilder();
+ subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+ subscriptionbuffer.append(" <create-subscription\n");
+ subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
+ subscriptionbuffer.append(" </create-subscription>\n");
+ subscriptionbuffer.append("</rpc>\n");
+ subscriptionbuffer.append(ENDPATTERN);
+ return subscriptionbuffer.toString();
+ }
+
+ @Override
+ public void endSubscription() throws NetconfException {
+ if (subscriptionConnected) {
+ streamHandler.setEnableNotifications(false);
+ } else {
+ throw new NetconfException("Subscription does not exist.");
+ }
+ }
+
private void sendHello() throws NetconfException {
serverCapabilities = sendRequest(createHelloString());
}
@@ -197,7 +237,7 @@
@Override
public CompletableFuture<String> request(String request) {
- CompletableFuture<String> ftrep = t.sendMessage(request);
+ CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
replies.put(messageIdInteger.get(), ftrep);
return ftrep;
}
@@ -382,31 +422,47 @@
}
@Override
- public boolean lock() throws NetconfException {
+ public boolean lock(String configType) throws NetconfException {
StringBuilder rpc = new StringBuilder(XML_HEADER);
- rpc.append("<rpc>");
+ rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<lock>");
rpc.append("<target>");
- rpc.append("<candidate/>");
+ rpc.append("<");
+ rpc.append(configType);
+ rpc.append("/>");
rpc.append("</target>");
rpc.append("</lock>");
rpc.append("</rpc>");
rpc.append(ENDPATTERN);
- return checkReply(sendRequest(rpc.toString()));
+ String lockReply = sendRequest(rpc.toString());
+ return checkReply(lockReply);
}
@Override
- public boolean unlock() throws NetconfException {
+ public boolean unlock(String configType) throws NetconfException {
StringBuilder rpc = new StringBuilder(XML_HEADER);
- rpc.append("<rpc>");
+ rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<unlock>");
rpc.append("<target>");
- rpc.append("<candidate/>");
+ rpc.append("<");
+ rpc.append(configType);
+ rpc.append("/>");
rpc.append("</target>");
rpc.append("</unlock>");
rpc.append("</rpc>");
rpc.append(ENDPATTERN);
- return checkReply(sendRequest(rpc.toString()));
+ String unlockReply = sendRequest(rpc.toString());
+ return checkReply(unlockReply);
+ }
+
+ @Override
+ public boolean lock() throws NetconfException {
+ return lock("running");
+ }
+
+ @Override
+ public boolean unlock() throws NetconfException {
+ return unlock("running");
}
@Override
@@ -454,12 +510,12 @@
@Override
public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
- t.addDeviceEventListener(listener);
+ streamHandler.addDeviceEventListener(listener);
}
@Override
public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
- t.removeDeviceEventListener(listener);
+ streamHandler.removeDeviceEventListener(listener);
}
private boolean checkReply(String reply) throws NetconfException {
@@ -481,6 +537,7 @@
@Override
public void notify(NetconfDeviceOutputEvent event) {
Optional<Integer> messageId = event.getMessageID();
+
if (!messageId.isPresent()) {
errorReplies.add(event.getMessagePayload());
log.error("Device {} sent error reply {}",
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
index 363657d..1ee7911 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -16,6 +16,7 @@
package org.onosproject.netconf.ctl;
+import com.google.common.annotations.Beta;
import org.onosproject.netconf.NetconfDeviceOutputEventListener;
import java.util.concurrent.CompletableFuture;
@@ -46,4 +47,13 @@
* @param listener Netconf device event listener
*/
void removeDeviceEventListener(NetconfDeviceOutputEventListener listener);
+
+ @Beta
+ /**
+ * Sets instance variable that when true allows receipt of notifications.
+ *
+ * @param enableNotifications if true, allows action based off notifications
+ * else, stops action based off notifications
+ */
+ void setEnableNotifications(boolean enableNotifications);
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 7a2a719..307c917 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -57,8 +57,9 @@
private NetconfDeviceInfo netconfDeviceInfo;
private NetconfSessionDelegate sessionDelegate;
private NetconfMessageState state;
- private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
- = Lists.newArrayList();
+ private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
+ = Lists.newCopyOnWriteArrayList();
+ private boolean enableNotifications = true;
public NetconfStreamThread(final InputStream in, final OutputStream out,
final InputStream err, NetconfDeviceInfo deviceInfo,
@@ -195,12 +196,14 @@
netconfDeviceEventListeners.forEach(
listener -> listener.event(event));
} else if (deviceReply.contains(NOTIFICATION_LABEL)) {
- final String finalDeviceReply = deviceReply;
- netconfDeviceEventListeners.forEach(
- listener -> listener.event(new NetconfDeviceOutputEvent(
- NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
- null, finalDeviceReply, getMsgId(finalDeviceReply),
- netconfDeviceInfo)));
+ if (enableNotifications) {
+ final String finalDeviceReply = deviceReply;
+ netconfDeviceEventListeners.forEach(
+ listener -> listener.event(new NetconfDeviceOutputEvent(
+ NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
+ null, finalDeviceReply, getMsgId(finalDeviceReply),
+ netconfDeviceInfo)));
+ }
} else {
log.info("Error on replay from device {} ", deviceReply);
}
@@ -240,4 +243,8 @@
public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
netconfDeviceEventListeners.remove(listener);
}
+
+ public void setEnableNotifications(boolean enableNotifications) {
+ this.enableNotifications = enableNotifications;
+ }
}