NetconfAlarmProvider alerts core about notifications given subscription.
Change-Id: I7561ba680eb8bac33a8543d6aa1bccf6732e95db
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 487f4cc..1ee0f08 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -81,9 +81,10 @@
private List<String> deviceCapabilities =
Collections.singletonList("urn:ietf:params:netconf:base:1.0");
private String serverCapabilities;
- private NetconfStreamHandler t;
+ private NetconfStreamHandler streamHandler;
private Map<Integer, CompletableFuture<String>> replies;
private List<String> errorReplies;
+ private boolean subscriptionConnected = false;
public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
@@ -136,9 +137,9 @@
try {
sshSession = netconfConnection.openSession();
sshSession.startSubSystem("netconf");
- t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
- sshSession.getStderr(), deviceInfo,
- new NetconfSessionDelegateImpl());
+ streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+ sshSession.getStderr(), deviceInfo,
+ new NetconfSessionDelegateImpl());
this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
sendHello();
} catch (IOException e) {
@@ -148,6 +149,45 @@
}
}
+ private void startSubscriptionConnection() throws NetconfException {
+ if (!serverCapabilities.contains("interleave")) {
+ throw new NetconfException("Device" + deviceInfo + "does not support interleave");
+ }
+ String reply = sendRequest(createSubscriptionString());
+ if (!checkReply(reply)) {
+ throw new NetconfException("Subscription not successful with device "
+ + deviceInfo + " with reply " + reply);
+ }
+ subscriptionConnected = true;
+ }
+
+ public void startSubscription() throws NetconfException {
+ if (!subscriptionConnected) {
+ startSubscriptionConnection();
+ }
+ streamHandler.setEnableNotifications(true);
+ }
+
+ private String createSubscriptionString() {
+ StringBuilder subscriptionbuffer = new StringBuilder();
+ subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+ subscriptionbuffer.append(" <create-subscription\n");
+ subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
+ subscriptionbuffer.append(" </create-subscription>\n");
+ subscriptionbuffer.append("</rpc>\n");
+ subscriptionbuffer.append(ENDPATTERN);
+ return subscriptionbuffer.toString();
+ }
+
+ @Override
+ public void endSubscription() throws NetconfException {
+ if (subscriptionConnected) {
+ streamHandler.setEnableNotifications(false);
+ } else {
+ throw new NetconfException("Subscription does not exist.");
+ }
+ }
+
private void sendHello() throws NetconfException {
serverCapabilities = sendRequest(createHelloString());
}
@@ -197,7 +237,7 @@
@Override
public CompletableFuture<String> request(String request) {
- CompletableFuture<String> ftrep = t.sendMessage(request);
+ CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
replies.put(messageIdInteger.get(), ftrep);
return ftrep;
}
@@ -382,31 +422,47 @@
}
@Override
- public boolean lock() throws NetconfException {
+ public boolean lock(String configType) throws NetconfException {
StringBuilder rpc = new StringBuilder(XML_HEADER);
- rpc.append("<rpc>");
+ rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<lock>");
rpc.append("<target>");
- rpc.append("<candidate/>");
+ rpc.append("<");
+ rpc.append(configType);
+ rpc.append("/>");
rpc.append("</target>");
rpc.append("</lock>");
rpc.append("</rpc>");
rpc.append(ENDPATTERN);
- return checkReply(sendRequest(rpc.toString()));
+ String lockReply = sendRequest(rpc.toString());
+ return checkReply(lockReply);
}
@Override
- public boolean unlock() throws NetconfException {
+ public boolean unlock(String configType) throws NetconfException {
StringBuilder rpc = new StringBuilder(XML_HEADER);
- rpc.append("<rpc>");
+ rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<unlock>");
rpc.append("<target>");
- rpc.append("<candidate/>");
+ rpc.append("<");
+ rpc.append(configType);
+ rpc.append("/>");
rpc.append("</target>");
rpc.append("</unlock>");
rpc.append("</rpc>");
rpc.append(ENDPATTERN);
- return checkReply(sendRequest(rpc.toString()));
+ String unlockReply = sendRequest(rpc.toString());
+ return checkReply(unlockReply);
+ }
+
+ @Override
+ public boolean lock() throws NetconfException {
+ return lock("running");
+ }
+
+ @Override
+ public boolean unlock() throws NetconfException {
+ return unlock("running");
}
@Override
@@ -454,12 +510,12 @@
@Override
public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
- t.addDeviceEventListener(listener);
+ streamHandler.addDeviceEventListener(listener);
}
@Override
public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
- t.removeDeviceEventListener(listener);
+ streamHandler.removeDeviceEventListener(listener);
}
private boolean checkReply(String reply) throws NetconfException {
@@ -481,6 +537,7 @@
@Override
public void notify(NetconfDeviceOutputEvent event) {
Optional<Integer> messageId = event.getMessageID();
+
if (!messageId.isPresent()) {
errorReplies.add(event.getMessagePayload());
log.error("Device {} sent error reply {}",