[ONOS-5675][ONOS-5676] RESTCONF JIRA fixes
This submission contains the fixes for the following 3 problems:
1. [ONOS-5675] RESTCONF clients may stop receiving notifications when multiple client
sessions are opened simultaneously
2. [ONOS-5676] RESTCONF client hangs on event notification subscription
3. bug fix for empty-leaf node
Change-Id: Icf6848d18e0599842d4889d5aa5443c58f23aeeb
diff --git a/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java b/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java
index 5363d89..ce6335b 100644
--- a/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java
+++ b/protocols/restconf/client/ctl/src/main/java/org/onosproject/protocol/restconf/ctl/RestConfSBControllerImpl.java
@@ -60,7 +60,7 @@
private static final String ROOT_RESOURCE = "/onos/restconf";
private static final String RESOURCE_PATH_PREFIX = "/data/";
- private static final String NOTIFICATION_PATH_PREFIX = "/data/";
+ private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
private Map<DeviceId, RestConfNotificationEventListener>
restconfNotificationListenerMap = new ConcurrentHashMap<>();
diff --git a/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java b/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java
index 95be9d1..5d398c3 100644
--- a/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java
+++ b/protocols/restconf/server/restconfmgr/src/main/java/org/onosproject/protocol/restconf/server/restconfmanager/RestconfManager.java
@@ -99,7 +99,9 @@
private static final String RESTCONF_ROOT = "/onos/restconf";
private static final int THREAD_TERMINATION_TIMEOUT = 10;
- private static final String EOL = String.format("%n");
+
+ // Jersey's default chunk parser uses "\r\n" as the chunk separator.
+ private static final String EOL = "\r\n";
private final int maxNumOfWorkerThreads = 5;
@@ -173,7 +175,7 @@
throw new RestconfException(String.format("No content for %s = %s",
getJsonNameFromYdtNode(child),
child.getValue()),
- INTERNAL_SERVER_ERROR);
+ INTERNAL_SERVER_ERROR);
}
return result;
}
@@ -263,15 +265,14 @@
* Creates a worker thread to listen to events and write to chunkedOutput.
* The worker thread blocks if no events arrive.
*
- * @param streamId ID of the RESTCONF stream to subscribe
- * @param output A string data stream
+ * @param streamId the RESTCONF stream id to which the client subscribes
+ * @param output the string data stream
* @throws RestconfException if the worker thread fails to create
*/
@Override
public void subscribeEventStream(String streamId,
ChunkedOutput<String> output)
throws RestconfException {
- BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
if (workerThreadPool instanceof ThreadPoolExecutor) {
if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
maxNumOfWorkerThreads) {
@@ -286,10 +287,10 @@
}
+ BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
workerThreadPool.submit(new EventConsumer(output, eventQueue));
}
-
/**
* Shutdown a pool cleanly if possible.
*
@@ -315,23 +316,32 @@
}
}
+ /**
+ * Implementation of a worker thread which reads data from a
+ * blocking queue and writes the data to a given chunk output stream.
+ * The thread is blocked when no data arrive to the queue and is
+ * terminated when the chunk output stream is closed (i.e., the
+ * HTTP-keep-alive session is closed).
+ */
private class EventConsumer implements Runnable {
- private final String queueId;
+ private String queueId;
private final ChunkedOutput<String> output;
private final BlockingQueue<ObjectNode> bqueue;
public EventConsumer(ChunkedOutput<String> output,
BlockingQueue<ObjectNode> q) {
- this.queueId = Thread.currentThread().getName();
this.output = output;
this.bqueue = q;
- eventQueueList.put(queueId, bqueue);
}
@Override
public void run() {
try {
+ queueId = String.valueOf(Thread.currentThread().getId());
+ eventQueueList.put(queueId, bqueue);
+ log.debug("EventConsumer thread created: {}", queueId);
+
ObjectNode chunk;
while ((chunk = bqueue.take()) != null) {
output.write(chunk.toString().concat(EOL));
@@ -381,10 +391,15 @@
* There is no consumer waiting to consume, so don't have to
* produce this event.
*/
+ log.debug("Q list is empty");
return;
}
try {
+ YdtContext ydtNode = event.subject().getNotificationRootContext();
+ ObjectNode jsonNode = convertYdtToJson(getJsonNameFromYdtNode(ydtNode),
+ ydtNode,
+ ymsService.getYdtWalker());
/*
* Put the event to every queue out there. Each queue is
* corresponding to an event stream session. The queue is
@@ -392,10 +407,6 @@
*/
for (Entry<String, BlockingQueue<ObjectNode>> entry : eventQueueList
.entrySet()) {
- YdtContext ydtNode = event.subject().getNotificationRootContext();
- ObjectNode jsonNode = convertYdtToJson(getJsonNameFromYdtNode(ydtNode),
- ydtNode,
- ymsService.getYdtWalker());
entry.getValue().put(jsonNode);
}
} catch (InterruptedException e) {
diff --git a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java
index b5f2911..654fff9 100644
--- a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java
+++ b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/DefaultJsonBuilder.java
@@ -90,9 +90,20 @@
return;
}
appendField(fieldName);
+
+ // If the value is null, then it's a empty leaf-node
+ if (value == null) {
+ treeString.append(QUOTE)
+ .append(QUOTE + COMMA);
+ return;
+ }
+
+ // If the value is empty, then it's a non-leaf node
if (value.isEmpty()) {
return;
}
+
+ // It's a non-empty leaf node
treeString.append(QUOTE)
.append(value)
.append(QUOTE + COMMA);
@@ -163,8 +174,8 @@
private void appendField(String fieldName) {
if (!isNullOrEmpty(fieldName)) {
treeString.append(QUOTE)
- .append(fieldName)
- .append(QUOTE + COLON);
+ .append(fieldName)
+ .append(QUOTE + COLON);
}
}
diff --git a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java
index 546bed7..cf4e690 100755
--- a/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java
+++ b/protocols/restconf/server/utils/src/main/java/org/onosproject/protocol/restconf/server/utils/parser/json/JsonToYdtListener.java
@@ -48,6 +48,7 @@
private static final int INPUT_FIELD_LENGTH = 2;
private static final String E_UNSUP_TYPE = "Unsupported node type %s " +
"field name is %s fieldName";
+ private static final String EMPTY_STRING = "null";
private Logger log = getLogger(getClass());
@@ -140,7 +141,8 @@
}
private void processLeafNode(NormalizedYangNode node, String value) {
- ydtBuilder.addLeaf(node.getName(), node.getNamespace(), value);
+ String leafValue = value.equalsIgnoreCase(EMPTY_STRING) ? null : value;
+ ydtBuilder.addLeaf(node.getName(), node.getNamespace(), leafValue);
}
private void processArrayNode(NormalizedYangNode normalizedNode,