NetconfAlarmProvider alerts core about notifications given subscription.

Change-Id: I7561ba680eb8bac33a8543d6aa1bccf6732e95db
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 487f4cc..1ee0f08 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -81,9 +81,10 @@
     private List<String> deviceCapabilities =
             Collections.singletonList("urn:ietf:params:netconf:base:1.0");
     private String serverCapabilities;
-    private NetconfStreamHandler t;
+    private NetconfStreamHandler streamHandler;
     private Map<Integer, CompletableFuture<String>> replies;
     private List<String> errorReplies;
+    private boolean subscriptionConnected = false;
 
 
     public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
@@ -136,9 +137,9 @@
         try {
             sshSession = netconfConnection.openSession();
             sshSession.startSubSystem("netconf");
-            t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
-                                        sshSession.getStderr(), deviceInfo,
-                                        new NetconfSessionDelegateImpl());
+            streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
+                                                    sshSession.getStderr(), deviceInfo,
+                                                    new NetconfSessionDelegateImpl());
             this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
             sendHello();
         } catch (IOException e) {
@@ -148,6 +149,45 @@
         }
     }
 
+    private void startSubscriptionConnection() throws NetconfException {
+        if (!serverCapabilities.contains("interleave")) {
+            throw new NetconfException("Device" + deviceInfo + "does not support interleave");
+        }
+        String reply = sendRequest(createSubscriptionString());
+        if (!checkReply(reply)) {
+            throw new NetconfException("Subscription not successful with device "
+                                               + deviceInfo + " with reply " + reply);
+        }
+        subscriptionConnected = true;
+    }
+
+    public void startSubscription() throws NetconfException {
+        if (!subscriptionConnected) {
+            startSubscriptionConnection();
+        }
+        streamHandler.setEnableNotifications(true);
+    }
+
+    private String createSubscriptionString() {
+        StringBuilder subscriptionbuffer = new StringBuilder();
+        subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
+        subscriptionbuffer.append("  <create-subscription\n");
+        subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
+        subscriptionbuffer.append("  </create-subscription>\n");
+        subscriptionbuffer.append("</rpc>\n");
+        subscriptionbuffer.append(ENDPATTERN);
+        return subscriptionbuffer.toString();
+    }
+
+    @Override
+    public void endSubscription() throws NetconfException {
+        if (subscriptionConnected) {
+            streamHandler.setEnableNotifications(false);
+        } else {
+            throw new NetconfException("Subscription does not exist.");
+        }
+    }
+
     private void sendHello() throws NetconfException {
         serverCapabilities = sendRequest(createHelloString());
     }
@@ -197,7 +237,7 @@
 
     @Override
     public CompletableFuture<String> request(String request) {
-        CompletableFuture<String> ftrep = t.sendMessage(request);
+        CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
         replies.put(messageIdInteger.get(), ftrep);
         return ftrep;
     }
@@ -382,31 +422,47 @@
     }
 
     @Override
-    public boolean lock() throws NetconfException {
+    public boolean lock(String configType) throws NetconfException {
         StringBuilder rpc = new StringBuilder(XML_HEADER);
-        rpc.append("<rpc>");
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<lock>");
         rpc.append("<target>");
-        rpc.append("<candidate/>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
         rpc.append("</target>");
         rpc.append("</lock>");
         rpc.append("</rpc>");
         rpc.append(ENDPATTERN);
-        return checkReply(sendRequest(rpc.toString()));
+        String lockReply = sendRequest(rpc.toString());
+        return checkReply(lockReply);
     }
 
     @Override
-    public boolean unlock() throws NetconfException {
+    public boolean unlock(String configType) throws NetconfException {
         StringBuilder rpc = new StringBuilder(XML_HEADER);
-        rpc.append("<rpc>");
+        rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         rpc.append("<unlock>");
         rpc.append("<target>");
-        rpc.append("<candidate/>");
+        rpc.append("<");
+        rpc.append(configType);
+        rpc.append("/>");
         rpc.append("</target>");
         rpc.append("</unlock>");
         rpc.append("</rpc>");
         rpc.append(ENDPATTERN);
-        return checkReply(sendRequest(rpc.toString()));
+        String unlockReply = sendRequest(rpc.toString());
+        return checkReply(unlockReply);
+    }
+
+    @Override
+    public boolean lock() throws NetconfException {
+        return lock("running");
+    }
+
+    @Override
+    public boolean unlock() throws NetconfException {
+        return unlock("running");
     }
 
     @Override
@@ -454,12 +510,12 @@
 
     @Override
     public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
-        t.addDeviceEventListener(listener);
+        streamHandler.addDeviceEventListener(listener);
     }
 
     @Override
     public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
-        t.removeDeviceEventListener(listener);
+        streamHandler.removeDeviceEventListener(listener);
     }
 
     private boolean checkReply(String reply) throws NetconfException {
@@ -481,6 +537,7 @@
         @Override
         public void notify(NetconfDeviceOutputEvent event)  {
             Optional<Integer> messageId = event.getMessageID();
+
             if (!messageId.isPresent()) {
                 errorReplies.add(event.getMessagePayload());
                 log.error("Device {} sent error reply {}",
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
index 363657d..1ee7911 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamHandler.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.netconf.ctl;
 
+import com.google.common.annotations.Beta;
 import org.onosproject.netconf.NetconfDeviceOutputEventListener;
 
 import java.util.concurrent.CompletableFuture;
@@ -46,4 +47,13 @@
      * @param listener Netconf device event listener
      */
     void removeDeviceEventListener(NetconfDeviceOutputEventListener listener);
+
+    @Beta
+    /**
+     * Sets instance variable that when true allows receipt of notifications.
+     *
+     * @param enableNotifications if true, allows action based off notifications
+     *                             else, stops action based off notifications
+     */
+    void setEnableNotifications(boolean enableNotifications);
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 7a2a719..307c917 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -57,8 +57,9 @@
     private NetconfDeviceInfo netconfDeviceInfo;
     private NetconfSessionDelegate sessionDelegate;
     private NetconfMessageState state;
-    private  List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
-            = Lists.newArrayList();
+    private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
+            = Lists.newCopyOnWriteArrayList();
+    private boolean enableNotifications = true;
 
     public NetconfStreamThread(final InputStream in, final OutputStream out,
                                final InputStream err, NetconfDeviceInfo deviceInfo,
@@ -195,12 +196,14 @@
                                 netconfDeviceEventListeners.forEach(
                                         listener -> listener.event(event));
                             } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
-                                final String finalDeviceReply = deviceReply;
-                                netconfDeviceEventListeners.forEach(
-                                        listener -> listener.event(new NetconfDeviceOutputEvent(
-                                                NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
-                                                null, finalDeviceReply, getMsgId(finalDeviceReply),
-                                                netconfDeviceInfo)));
+                                if (enableNotifications) {
+                                    final String finalDeviceReply = deviceReply;
+                                    netconfDeviceEventListeners.forEach(
+                                            listener -> listener.event(new NetconfDeviceOutputEvent(
+                                                    NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
+                                                    null, finalDeviceReply, getMsgId(finalDeviceReply),
+                                                    netconfDeviceInfo)));
+                                }
                             } else {
                                 log.info("Error on replay from device {} ", deviceReply);
                             }
@@ -240,4 +243,8 @@
     public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
         netconfDeviceEventListeners.remove(listener);
     }
+
+    public void setEnableNotifications(boolean enableNotifications) {
+        this.enableNotifications = enableNotifications;
+    }
 }