[ONOS-5675][ONOS-5676] RESTCONF JIRA fixes

This submission contains the fixes for the following 3 problems:

1. [ONOS-5675] RESTCONF clients may stop receiving notifications when multiple client
               sessions are opened simultaneously
2. [ONOS-5676] RESTCONF client hangs on event notification subscription
3. bug fix for empty-leaf node

Change-Id: Icf6848d18e0599842d4889d5aa5443c58f23aeeb
diff --git a/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java b/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java
index 5363d89..ce6335b 100644
--- a/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java
+++ b/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java
@@ -60,7 +60,7 @@
     private static final String ROOT_RESOURCE = "/onos/restconf";
 
     private static final String RESOURCE_PATH_PREFIX = "/data/";
-    private static final String NOTIFICATION_PATH_PREFIX = "/data/";
+    private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
 
     private Map<DeviceId, RestConfNotificationEventListener>
                                             restconfNotificationListenerMap = new ConcurrentHashMap<>();
diff --git a/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java b/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java
index 95be9d1..5d398c3 100644
--- a/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java
+++ b/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java
@@ -99,7 +99,9 @@
 
     private static final String RESTCONF_ROOT = "/onos/restconf";
     private static final int THREAD_TERMINATION_TIMEOUT = 10;
-    private static final String EOL = String.format("%n");
+
+    // Jersey's default chunk parser uses "\r\n" as the chunk separator.
+    private static final String EOL = "\r\n";
 
     private final int maxNumOfWorkerThreads = 5;
 
@@ -173,7 +175,7 @@
             throw new RestconfException(String.format("No content for %s = %s",
                                                       getJsonNameFromYdtNode(child),
                                                       child.getValue()),
-                                                      INTERNAL_SERVER_ERROR);
+                                        INTERNAL_SERVER_ERROR);
         }
         return result;
     }
@@ -263,15 +265,14 @@
      * Creates a worker thread to listen to events and write to chunkedOutput.
      * The worker thread blocks if no events arrive.
      *
-     * @param streamId ID of the RESTCONF stream to subscribe
-     * @param output   A string data stream
+     * @param streamId the RESTCONF stream id to which the client subscribes
+     * @param output   the string data stream
      * @throws RestconfException if the worker thread fails to create
      */
     @Override
     public void subscribeEventStream(String streamId,
                                      ChunkedOutput<String> output)
             throws RestconfException {
-        BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
         if (workerThreadPool instanceof ThreadPoolExecutor) {
             if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
                     maxNumOfWorkerThreads) {
@@ -286,10 +287,10 @@
 
         }
 
+        BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
         workerThreadPool.submit(new EventConsumer(output, eventQueue));
     }
 
-
     /**
      * Shutdown a pool cleanly if possible.
      *
@@ -315,23 +316,32 @@
         }
     }
 
+    /**
+     * Implementation of a worker thread which reads data from a
+     * blocking queue and writes the data to a given chunk output stream.
+     * The thread is blocked when no data arrive to the queue and is
+     * terminated when the chunk output stream is closed (i.e., the
+     * HTTP-keep-alive session is closed).
+     */
     private class EventConsumer implements Runnable {
 
-        private final String queueId;
+        private String queueId;
         private final ChunkedOutput<String> output;
         private final BlockingQueue<ObjectNode> bqueue;
 
         public EventConsumer(ChunkedOutput<String> output,
                              BlockingQueue<ObjectNode> q) {
-            this.queueId = Thread.currentThread().getName();
             this.output = output;
             this.bqueue = q;
-            eventQueueList.put(queueId, bqueue);
         }
 
         @Override
         public void run() {
             try {
+                queueId = String.valueOf(Thread.currentThread().getId());
+                eventQueueList.put(queueId, bqueue);
+                log.debug("EventConsumer thread created: {}", queueId);
+
                 ObjectNode chunk;
                 while ((chunk = bqueue.take()) != null) {
                     output.write(chunk.toString().concat(EOL));
@@ -381,10 +391,15 @@
                  * There is no consumer waiting to consume, so don't have to
                  * produce this event.
                  */
+                log.debug("Q list is empty");
                 return;
             }
 
             try {
+                YdtContext ydtNode = event.subject().getNotificationRootContext();
+                ObjectNode jsonNode = convertYdtToJson(getJsonNameFromYdtNode(ydtNode),
+                                                       ydtNode,
+                                                       ymsService.getYdtWalker());
                 /*
                  * Put the event to every queue out there. Each queue is
                  * corresponding to an event stream session. The queue is
@@ -392,10 +407,6 @@
                  */
                 for (Entry<String, BlockingQueue<ObjectNode>> entry : eventQueueList
                         .entrySet()) {
-                    YdtContext ydtNode = event.subject().getNotificationRootContext();
-                    ObjectNode jsonNode = convertYdtToJson(getJsonNameFromYdtNode(ydtNode),
-                                                           ydtNode,
-                                                           ymsService.getYdtWalker());
                     entry.getValue().put(jsonNode);
                 }
             } catch (InterruptedException e) {
diff --git a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java
index b5f2911..654fff9 100644
--- a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java
+++ b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java
@@ -90,9 +90,20 @@
             return;
         }
         appendField(fieldName);
+
+        // If the value is null, then it's a empty leaf-node
+        if (value == null) {
+            treeString.append(QUOTE)
+                    .append(QUOTE + COMMA);
+            return;
+        }
+
+        // If the value is empty, then it's a non-leaf node
         if (value.isEmpty()) {
             return;
         }
+
+        // It's a non-empty leaf node
         treeString.append(QUOTE)
                 .append(value)
                 .append(QUOTE + COMMA);
@@ -163,8 +174,8 @@
     private void appendField(String fieldName) {
         if (!isNullOrEmpty(fieldName)) {
             treeString.append(QUOTE)
-            .append(fieldName)
-            .append(QUOTE + COLON);
+                    .append(fieldName)
+                    .append(QUOTE + COLON);
         }
     }
 
diff --git a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java
index 546bed7..cf4e690 100755
--- a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java
+++ b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java
@@ -48,6 +48,7 @@
     private static final int INPUT_FIELD_LENGTH = 2;
     private static final String E_UNSUP_TYPE = "Unsupported node type %s " +
             "field name is %s fieldName";
+    private static final String EMPTY_STRING = "null";
 
     private Logger log = getLogger(getClass());
 
@@ -140,7 +141,8 @@
     }
 
     private void processLeafNode(NormalizedYangNode node, String value) {
-        ydtBuilder.addLeaf(node.getName(), node.getNamespace(), value);
+        String leafValue = value.equalsIgnoreCase(EMPTY_STRING) ? null : value;
+        ydtBuilder.addLeaf(node.getName(), node.getNamespace(), leafValue);
     }
 
     private void processArrayNode(NormalizedYangNode normalizedNode,