[ONOS-4849] NETCONF function for FUJITSU OLT #6

- Add alart command for FUJITSU OLT
   volt-notification-alertfilter <netconf:target>
   volt-notification-setalertfilter <netconf:target> <alert-severity>
   volt-notification-subscribe <netconf:target> {disable}
- Update fujitsu-drivers.xml and shell-config.xml in FUJITSU directory
- Apply Yuta's and Andrea's suggestion to startSubscription method and other methods which related to startSubscription method.
  -> Remove startSubscriptionConnection() and createSubscriptionString() method in 4th patch-set.
  -> Modify cosmetic issue related to Static-string.
  -> Update "No replay" to "No reply" in VoltGetPonLinksCommand.java

Change-Id: I2c8d5484ea0ff9f0b1b970fe8b183bec12193c46
diff --git a/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/FujitsuVoltAlertConfig.java b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/FujitsuVoltAlertConfig.java
new file mode 100644
index 0000000..fbefb5a
--- /dev/null
+++ b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/FujitsuVoltAlertConfig.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.drivers.fujitsu;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.drivers.fujitsu.behaviour.VoltAlertConfig;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.netconf.NetconfController;
+import org.onosproject.mastership.MastershipService;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.drivers.fujitsu.FujitsuVoltXmlUtility.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation to get and set parameters available in vOLT
+ * through the Netconf protocol.
+ */
+public class FujitsuVoltAlertConfig extends AbstractHandlerBehaviour
+        implements VoltAlertConfig {
+
+    private final Logger log = getLogger(FujitsuVoltAlertConfig.class);
+    private static final String VOLT_ALERTS = "volt-alerts";
+    private static final String ALERT_FILTER = "alert-filter";
+    private static final String NOTIFY_ALERT = "notify-alert";
+    private final Set<String> severityLevels = ImmutableSet.of(
+            "none", "info", "minor", "major", "critical");
+    private static final String DISABLE = "disable";
+
+
+    @Override
+    public String getAlertFilter() {
+        DriverHandler handler = handler();
+        NetconfController controller = handler.get(NetconfController.class);
+        MastershipService mastershipService = handler.get(MastershipService.class);
+        DeviceId ncDeviceId = handler.data().deviceId();
+        checkNotNull(controller, "Netconf controller is null");
+        String reply = null;
+
+        if (!mastershipService.isLocalMaster(ncDeviceId)) {
+            log.warn("Not master for {} Use {} to execute command",
+                     ncDeviceId,
+                     mastershipService.getMasterFor(ncDeviceId));
+            return reply;
+        }
+
+        try {
+            StringBuilder request = new StringBuilder();
+            request.append(VOLT_NE_OPEN).append(VOLT_NE_NAMESPACE);
+            request.append(ANGLE_RIGHT).append(NEW_LINE);
+            request.append(buildStartTag(VOLT_ALERTS));
+            request.append(buildEmptyTag(ALERT_FILTER));
+            request.append(buildEndTag(VOLT_ALERTS));
+            request.append(VOLT_NE_CLOSE);
+
+            reply = controller.
+                    getDevicesMap().get(ncDeviceId).getSession().
+                    get(request.toString(), REPORT_ALL);
+        } catch (IOException e) {
+            log.error("Cannot communicate to device {} exception ", ncDeviceId, e);
+        }
+        return reply;
+    }
+
+    @Override
+    public void setAlertFilter(String severity) {
+        DriverHandler handler = handler();
+        NetconfController controller = handler.get(NetconfController.class);
+        MastershipService mastershipService = handler.get(MastershipService.class);
+        DeviceId ncDeviceId = handler.data().deviceId();
+        checkNotNull(controller, "Netconf controller is null");
+
+        if (!mastershipService.isLocalMaster(ncDeviceId)) {
+            log.warn("Not master for {} Use {} to execute command",
+                     ncDeviceId,
+                     mastershipService.getMasterFor(ncDeviceId));
+            return;
+        }
+
+        if (!severityLevels.contains(severity)) {
+            log.error("Invalid severity level: " + severity);
+            return;
+        }
+
+        try {
+            StringBuilder request = new StringBuilder();
+            request.append(VOLT_NE_OPEN).append(VOLT_NE_NAMESPACE);
+            request.append(ANGLE_RIGHT).append(NEW_LINE);
+            request.append(buildStartTag(VOLT_ALERTS));
+            request.append(buildStartTag(ALERT_FILTER, false));
+            request.append(severity);
+            request.append(buildEndTag(ALERT_FILTER));
+            request.append(buildEndTag(VOLT_ALERTS));
+            request.append(VOLT_NE_CLOSE);
+
+            controller.getDevicesMap().get(ncDeviceId).getSession().
+                    editConfig(RUNNING, null, request.toString());
+        } catch (IOException e) {
+            log.error("Cannot communicate to device {} exception ", ncDeviceId, e);
+        }
+    }
+
+    @Override
+    public void subscribe(String mode) {
+        DriverHandler handler = handler();
+        NetconfController controller = handler.get(NetconfController.class);
+        MastershipService mastershipService = handler.get(MastershipService.class);
+        DeviceId ncDeviceId = handler.data().deviceId();
+        checkNotNull(controller, "Netconf controller is null");
+
+        if (!mastershipService.isLocalMaster(ncDeviceId)) {
+            log.warn("Not master for {} Use {} to execute command",
+                     ncDeviceId,
+                     mastershipService.getMasterFor(ncDeviceId));
+            return;
+        }
+
+        if (mode != null) {
+            if (!DISABLE.equals(mode)) {
+                log.error("Invalid mode: " + mode);
+                return;
+            }
+        }
+
+        try {
+            if (mode != null) {
+                controller.getDevicesMap().get(ncDeviceId).getSession().
+                        endSubscription();
+            } else {
+                StringBuilder request = new StringBuilder();
+                request.append(ANGLE_LEFT).append(NOTIFY_ALERT).append(SPACE);
+                request.append(VOLT_NE_NAMESPACE).append(SLASH).append(ANGLE_RIGHT);
+
+                controller.getDevicesMap().get(ncDeviceId).getSession().
+                        startSubscription(request.toString());
+            }
+        } catch (IOException e) {
+            log.error("Cannot communicate to device {} exception ", ncDeviceId, e);
+        }
+    }
+
+}
diff --git a/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/behaviour/VoltAlertConfig.java b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/behaviour/VoltAlertConfig.java
new file mode 100644
index 0000000..9385361
--- /dev/null
+++ b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/behaviour/VoltAlertConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.drivers.fujitsu.behaviour;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.net.driver.HandlerBehaviour;
+
+/**
+ * Device behaviour to obtain and set alert filter in vOLT.
+ * Device behaviour to subscribe to receive notifications from vOLT.
+ */
+@Beta
+public interface VoltAlertConfig extends HandlerBehaviour {
+
+    /**
+     * Get alert filter severity level.
+     *
+     * @return response string
+     */
+    String getAlertFilter();
+
+    /**
+     * Set alert filter severity level.
+     *
+     * @param severity input data in string
+     */
+    void setAlertFilter(String severity);
+
+    /**
+     * Subscribe to receive notifications or unsubscribe.
+     *
+     * @param mode disable subscription
+     */
+    void subscribe(String mode);
+}
diff --git a/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltGetAlertFilterCommand.java b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltGetAlertFilterCommand.java
new file mode 100644
index 0000000..334c750
--- /dev/null
+++ b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltGetAlertFilterCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.drivers.fujitsu.cli;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.DeviceId;
+import org.onosproject.drivers.fujitsu.behaviour.VoltAlertConfig;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
+
+/**
+ * Gets alert filter severity level in vOLT.
+ */
+@Command(scope = "onos", name = "volt-notification-alertfilter",
+        description = "Gets alert filter severity level in vOLT")
+public class VoltGetAlertFilterCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "uri", description = "Device ID",
+            required = true, multiValued = false)
+    String uri = null;
+
+    private DeviceId deviceId;
+
+    @Override
+    protected void execute() {
+        DriverService service = get(DriverService.class);
+        deviceId = DeviceId.deviceId(uri);
+        DriverHandler h = service.createHandler(deviceId);
+        VoltAlertConfig voltNe = h.behaviour(VoltAlertConfig.class);
+        String reply = voltNe.getAlertFilter();
+        if (reply != null) {
+            print("%s", reply);
+        } else {
+            print("No reply from %s", deviceId.toString());
+        }
+    }
+}
diff --git a/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltGetPonLinksCommand.java b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltGetPonLinksCommand.java
index 9129d94..1cb73ca 100644
--- a/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltGetPonLinksCommand.java
+++ b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltGetPonLinksCommand.java
@@ -50,7 +50,7 @@
         if (reply != null) {
             print("%s", reply);
         } else {
-            print("No replay from %s", deviceId.toString());
+            print("No reply from %s", deviceId.toString());
         }
     }
 
diff --git a/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltSetAlertFilterCommand.java b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltSetAlertFilterCommand.java
new file mode 100644
index 0000000..2719f31
--- /dev/null
+++ b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltSetAlertFilterCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.drivers.fujitsu.cli;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.DeviceId;
+import org.onosproject.drivers.fujitsu.behaviour.VoltAlertConfig;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
+
+/**
+ * Sets alert filter severity level in vOLT.
+ */
+@Command(scope = "onos", name = "volt-notification-setalertfilter",
+        description = "Sets alert filter severity level in vOLT")
+public class VoltSetAlertFilterCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "uri", description = "Device ID",
+            required = true, multiValued = false)
+    String uri = null;
+
+    @Argument(index = 1, name = "target", description = "Severity level",
+            required = true, multiValued = false)
+    String severity = null;
+
+    private DeviceId deviceId;
+
+    @Override
+    protected void execute() {
+        DriverService service = get(DriverService.class);
+        deviceId = DeviceId.deviceId(uri);
+        DriverHandler h = service.createHandler(deviceId);
+        VoltAlertConfig volt = h.behaviour(VoltAlertConfig.class);
+        volt.setAlertFilter(severity);
+    }
+}
diff --git a/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltSubscribeCommand.java b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltSubscribeCommand.java
new file mode 100644
index 0000000..3ed1ada
--- /dev/null
+++ b/drivers/fujitsu/src/main/java/org/onosproject/drivers/fujitsu/cli/VoltSubscribeCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.drivers.fujitsu.cli;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.DeviceId;
+import org.onosproject.drivers.fujitsu.behaviour.VoltAlertConfig;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
+
+/**
+ * Subscribes to receive notifications of vOLT or unsubscribes.
+ */
+@Command(scope = "onos", name = "volt-notification-subscribe",
+        description = "Subscribes to receive notifications of vOLT")
+public class VoltSubscribeCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "uri", description = "Device ID",
+            required = true, multiValued = false)
+    String uri = null;
+
+    @Argument(index = 1, name = "mode", description = "Disable subscription",
+            required = false, multiValued = false)
+    String mode = null;
+
+    private DeviceId deviceId;
+
+    @Override
+    protected void execute() {
+        DriverService service = get(DriverService.class);
+        deviceId = DeviceId.deviceId(uri);
+        DriverHandler h = service.createHandler(deviceId);
+        VoltAlertConfig volt = h.behaviour(VoltAlertConfig.class);
+        volt.subscribe(mode);
+    }
+}
diff --git a/drivers/fujitsu/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/drivers/fujitsu/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 1f72cc6..a361f7d 100644
--- a/drivers/fujitsu/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/drivers/fujitsu/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -59,8 +59,25 @@
                 <ref component-id="deviceIdCompleter"/>
             </completers>
         </command>
+        <command>
+            <action class="org.onosproject.drivers.fujitsu.cli.VoltGetAlertFilterCommand"/>
+            <completers>
+                <ref component-id="deviceIdCompleter"/>
+            </completers>
+        </command>
+        <command>
+            <action class="org.onosproject.drivers.fujitsu.cli.VoltSetAlertFilterCommand"/>
+            <completers>
+                <ref component-id="deviceIdCompleter"/>
+            </completers>
+        </command>
+        <command>
+            <action class="org.onosproject.drivers.fujitsu.cli.VoltSubscribeCommand"/>
+            <completers>
+                <ref component-id="deviceIdCompleter"/>
+            </completers>
+        </command>
     </command-bundle>
 
     <bean id="deviceIdCompleter" class="org.onosproject.cli.net.DeviceIdCompleter"/>
-
 </blueprint>
diff --git a/drivers/fujitsu/src/main/resources/fujitsu-drivers.xml b/drivers/fujitsu/src/main/resources/fujitsu-drivers.xml
index 46cb697..3547394 100644
--- a/drivers/fujitsu/src/main/resources/fujitsu-drivers.xml
+++ b/drivers/fujitsu/src/main/resources/fujitsu-drivers.xml
@@ -30,5 +30,7 @@
                    impl="org.onosproject.drivers.fujitsu.FujitsuVoltOnuConfig"/>
         <behaviour api="org.onosproject.drivers.fujitsu.behaviour.VoltOnuOperConfig"
                    impl="org.onosproject.drivers.fujitsu.FujitsuVoltOnuOperConfig"/>
+        <behaviour api="org.onosproject.drivers.fujitsu.behaviour.VoltAlertConfig"
+                   impl="org.onosproject.drivers.fujitsu.FujitsuVoltAlertConfig"/>
     </driver>
-</drivers>
\ No newline at end of file
+</drivers>
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index 3e35b5a..1d83fd6 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.netconf;
 
+import com.google.common.annotations.Beta;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
@@ -60,7 +61,7 @@
             throws NetconfException;
 
     /**
-     * Executes an RPC to the server and wrap the request in RPC header.
+     * Executes an synchronous RPC to the server and wrap the request in RPC header.
      *
      * @param request the XML containing the request to the server.
      * @return Server response or ERROR
@@ -158,6 +159,15 @@
     void startSubscription() throws NetconfException;
 
     /**
+     * Starts subscription to the device's notifications.
+     *
+     * @param filterSchema XML subtrees to indicate specific notification
+     * @throws NetconfException when there is a problem starting the subscription
+     */
+    @Beta
+    void startSubscription(String filterSchema) throws NetconfException;
+
+    /**
      * Ends subscription to the device's notifications.
      *
      * @throws NetconfException when there is a problem ending the subscription
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index e359641..fde6bc2 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.netconf.ctl;
 
+import com.google.common.annotations.Beta;
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
 import com.google.common.base.Preconditions;
@@ -66,8 +67,8 @@
     private static final String WITH_DEFAULT_CLOSE = "</with-defaults>";
     private static final String DEFAULT_OPERATION_OPEN = "<default-operation>";
     private static final String DEFAULT_OPERATION_CLOSE = "</default-operation>";
-    private static final String FILTER_OPEN = "<filter type=\"subtree\">";
-    private static final String FILTER_CLOSE = "</filter>";
+    private static final String SUBTREE_FILTER_OPEN = "<filter type=\"subtree\">";
+    private static final String SUBTREE_FILTER_CLOSE = "</filter>";
     private static final String EDIT_CONFIG_OPEN = "<edit-config>";
     private static final String EDIT_CONFIG_CLOSE = "</edit-config>";
     private static final String TARGET_OPEN = "<target>";
@@ -80,6 +81,8 @@
             "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"";
     private static final String NETCONF_WITH_DEFAULTS_NAMESPACE =
             "xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults\"";
+    private static final String SUBSCRIPTION_SUBTREE_FILTER_OPEN =
+            "<filter xmlns:base10=\"urn:ietf:params:xml:ns:netconf:base:1.0\" base10:type=\"subtree\">";
 
     private final AtomicInteger messageIdInteger = new AtomicInteger(0);
     private Connection netconfConnection;
@@ -157,11 +160,13 @@
         }
     }
 
-    private void startSubscriptionConnection() throws NetconfException {
+
+    @Beta
+    private void startSubscriptionConnection(String filterSchema) throws NetconfException {
         if (!serverCapabilities.contains("interleave")) {
             throw new NetconfException("Device" + deviceInfo + "does not support interleave");
         }
-        String reply = sendRequest(createSubscriptionString());
+        String reply = sendRequest(createSubscriptionString(filterSchema));
         if (!checkReply(reply)) {
             throw new NetconfException("Subscription not successful with device "
                                                + deviceInfo + " with reply " + reply);
@@ -169,18 +174,37 @@
         subscriptionConnected = true;
     }
 
+    @Override
     public void startSubscription() throws NetconfException {
         if (!subscriptionConnected) {
-            startSubscriptionConnection();
+            startSubscriptionConnection(null);
         }
         streamHandler.setEnableNotifications(true);
     }
 
-    private String createSubscriptionString() {
+    @Beta
+    @Override
+    public void startSubscription(String filterSchema) throws NetconfException {
+        if (!subscriptionConnected) {
+            startSubscriptionConnection(filterSchema);
+        }
+        streamHandler.setEnableNotifications(true);
+    }
+
+    @Beta
+    private String createSubscriptionString(String filterSchema) {
         StringBuilder subscriptionbuffer = new StringBuilder();
         subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         subscriptionbuffer.append("  <create-subscription\n");
         subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
+        // FIXME Only subtree filtering supported at the moment.
+        if (filterSchema != null) {
+            subscriptionbuffer.append("    ");
+            subscriptionbuffer.append(SUBSCRIPTION_SUBTREE_FILTER_OPEN).append(NEW_LINE);
+            subscriptionbuffer.append(filterSchema).append(NEW_LINE);
+            subscriptionbuffer.append("    ");
+            subscriptionbuffer.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE);
+        }
         subscriptionbuffer.append("  </create-subscription>\n");
         subscriptionbuffer.append("</rpc>\n");
         subscriptionbuffer.append(ENDPATTERN);
@@ -323,9 +347,9 @@
         rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
         rpc.append(GET_OPEN).append(NEW_LINE);
         if (filterSchema != null) {
-            rpc.append(FILTER_OPEN).append(NEW_LINE);
+            rpc.append(SUBTREE_FILTER_OPEN).append(NEW_LINE);
             rpc.append(filterSchema).append(NEW_LINE);
-            rpc.append(FILTER_CLOSE).append(NEW_LINE);
+            rpc.append(SUBTREE_FILTER_CLOSE).append(NEW_LINE);
         }
         if (withDefaultsMode != null) {
             rpc.append(WITH_DEFAULT_OPEN).append(NETCONF_WITH_DEFAULTS_NAMESPACE).append(">");
@@ -577,7 +601,9 @@
             }
             CompletableFuture<String> completedReply =
                     replies.get(messageId.get());
-            completedReply.complete(event.getMessagePayload());
+            if (completedReply != null) {
+                completedReply.complete(event.getMessagePayload());
+            }
         }
     }
 }