NetconfAlarmProvider alerts core about notifications given subscription.

Change-Id: I7561ba680eb8bac33a8543d6aa1bccf6732e95db
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 487f4cc..1ee0f08 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -81,9 +81,10 @@
     private List<String> deviceCapabilities =
             Collections.singletonList("urn:ietf:params:netconf:base:1.0");
     private String serverCapabilities;
-    private NetconfStreamHandler t;
+    private NetconfStreamHandler streamHandler;
     private Map<Integer, CompletableFuture<String>> replies;
     private List<String> errorReplies;
+    private boolean subscriptionConnected = false;
 
 
     public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
@@ -136,9 +137,9 @@
         try {
             sshSession = netconfConnection.openSession();
             sshSession.startSubSystem("netconf");
-            t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
-                                        sshSession.getStderr(), deviceInfo,
-                                        new NetconfSessionDelegateImpl());
+            streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+                                                    sshSession.getStderr(), deviceInfo,
+                                                    new NetconfSessionDelegateImpl());
             this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
             sendHello();
         } catch (IOException e) {
@@ -148,6 +149,45 @@
         }
     }
 
+    private void startSubscriptionConnection() throws NetconfException {
+        if (!serverCapabilities.contains("interleave")) {
+            throw new NetconfException("Device" + deviceInfo + "does not support interleave");
+        }
+        String reply = sendRequest(createSubscriptionString());
+        if (!checkReply(reply)) {
+            throw new NetconfException("Subscription not successful with device "
+                                               + deviceInfo + " with reply " + reply);
+        }
+        subscriptionConnected = true;
+    }
+
+    public void startSubscription() throws NetconfException {
+        if (!subscriptionConnected) {
+            startSubscriptionConnection();
+        }
+        streamHandler.setEnableNotifications(true);
+    }
+
+    private String createSubscriptionString() {
+        StringBuilder subscriptionbuffer = new StringBuilder();
+        subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        subscriptionbuffer.append("  <create-subscription\n");
+        subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
+        subscriptionbuffer.append("  </create-subscription>\n");
+        subscriptionbuffer.append("</rpc>\n");
+        subscriptionbuffer.append(ENDPATTERN);
+        return subscriptionbuffer.toString();
+    }
+
+    @Override
+    public void endSubscription() throws NetconfException {
+        if (subscriptionConnected) {
+            streamHandler.setEnableNotifications(false);
+        } else {
+            throw new NetconfException("Subscription does not exist.");
+        }
+    }
+
     private void sendHello() throws NetconfException {
         serverCapabilities = sendRequest(createHelloString());
     }
@@ -197,7 +237,7 @@
 
     @Override
     public CompletableFuture<String> request(String request) {
-        CompletableFuture<String> ftrep = t.sendMessage(request);
+        CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
         replies.put(messageIdInteger.get(), ftrep);
         return ftrep;
     }
@@ -382,31 +422,47 @@
     }
 
     @Override
-    public boolean lock() throws NetconfException {
+    public boolean lock(String configType) throws NetconfException {
         StringBuilder rpc = new StringBuilder(XML_HEADER);
-        rpc.append("<rpc>");
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<lock>");
         rpc.append("<target>");
-        rpc.append("<candidate/>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
         rpc.append("</target>");
         rpc.append("</lock>");
         rpc.append("</rpc>");
         rpc.append(ENDPATTERN);
-        return checkReply(sendRequest(rpc.toString()));
+        String lockReply = sendRequest(rpc.toString());
+        return checkReply(lockReply);
     }
 
     @Override
-    public boolean unlock() throws NetconfException {
+    public boolean unlock(String configType) throws NetconfException {
         StringBuilder rpc = new StringBuilder(XML_HEADER);
-        rpc.append("<rpc>");
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<unlock>");
         rpc.append("<target>");
-        rpc.append("<candidate/>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
         rpc.append("</target>");
         rpc.append("</unlock>");
         rpc.append("</rpc>");
         rpc.append(ENDPATTERN);
-        return checkReply(sendRequest(rpc.toString()));
+        String unlockReply = sendRequest(rpc.toString());
+        return checkReply(unlockReply);
+    }
+
+    @Override
+    public boolean lock() throws NetconfException {
+        return lock("running");
+    }
+
+    @Override
+    public boolean unlock() throws NetconfException {
+        return unlock("running");
     }
 
     @Override
@@ -454,12 +510,12 @@
 
     @Override
     public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
-        t.addDeviceEventListener(listener);
+        streamHandler.addDeviceEventListener(listener);
     }
 
     @Override
     public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
-        t.removeDeviceEventListener(listener);
+        streamHandler.removeDeviceEventListener(listener);
     }
 
     private boolean checkReply(String reply) throws NetconfException {
@@ -481,6 +537,7 @@
         @Override
         public void notify(NetconfDeviceOutputEvent event)  {
             Optional<Integer> messageId = event.getMessageID();
+
             if (!messageId.isPresent()) {
                 errorReplies.add(event.getMessagePayload());
                 log.error("Device {} sent error reply {}",