[ONOS-6760][ONOS-6761] RESTCONF changes and fixes

* bug fixes for POST and PUT
* added support for PATCH
* added support for RPC

Change-Id: Ib03dd1e92d4e7231851340063eb23e4b3ad582cf
diff --git a/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
index 2d32379..ce01dcd 100644
--- a/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
+++ b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
@@ -29,34 +29,35 @@
 import org.onosproject.config.FailedException;
 import org.onosproject.config.Filter;
 import org.onosproject.restconf.api.RestconfException;
+import org.onosproject.restconf.api.RestconfRpcOutput;
 import org.onosproject.restconf.api.RestconfService;
+import org.onosproject.restconf.utils.RestconfUtils;
 import org.onosproject.yang.model.DataNode;
+import org.onosproject.yang.model.DefaultResourceData;
 import org.onosproject.yang.model.InnerNode;
 import org.onosproject.yang.model.KeyLeaf;
 import org.onosproject.yang.model.ListKey;
 import org.onosproject.yang.model.NodeKey;
 import org.onosproject.yang.model.ResourceData;
 import org.onosproject.yang.model.ResourceId;
+import org.onosproject.yang.model.RpcInput;
+import org.onosproject.yang.model.RpcOutput;
 import org.onosproject.yang.model.SchemaId;
-import org.onosproject.yang.model.DefaultResourceData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.net.URI;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
 import static org.onosproject.restconf.utils.RestconfUtils.convertDataNodeToJson;
 import static org.onosproject.restconf.utils.RestconfUtils.convertJsonToDataNode;
 import static org.onosproject.restconf.utils.RestconfUtils.convertUriToRid;
+import static org.onosproject.restconf.utils.RestconfUtils.rmLastPathSegment;
 import static org.onosproject.yang.model.DataNode.Type.MULTI_INSTANCE_NODE;
 import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_LEAF_VALUE_NODE;
 import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_NODE;
@@ -77,10 +78,6 @@
 public class RestconfManager implements RestconfService {
 
     private static final String RESTCONF_ROOT = "/onos/restconf";
-    private static final int THREAD_TERMINATION_TIMEOUT = 10;
-
-    // Jersey's default chunk parser uses "\r\n" as the chunk separator.
-    private static final String EOL = "\r\n";
 
     private final int maxNumOfWorkerThreads = 5;
 
@@ -89,9 +86,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DynamicConfigService dynamicConfigService;
 
-    private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
-            new ConcurrentHashMap<>();
-
     private ExecutorService workerThreadPool;
 
     @Activate
@@ -106,18 +100,22 @@
 
     @Deactivate
     protected void deactivate() {
-        shutdownAndAwaitTermination(workerThreadPool);
+        workerThreadPool.shutdownNow();
         log.info("Stopped");
     }
 
     @Override
-    public ObjectNode runGetOperationOnDataResource(String uri)
+    public ObjectNode runGetOperationOnDataResource(URI uri)
             throws RestconfException {
         ResourceId rid = convertUriToRid(uri);
         // TODO: define Filter (if there is any requirement).
         Filter filter = new Filter();
         DataNode dataNode;
+
         try {
+            if (!dynamicConfigService.nodeExist(rid)) {
+                return null;
+            }
             dataNode = dynamicConfigService.readNode(rid, filter);
         } catch (FailedException e) {
             log.error("ERROR: DynamicConfigService: ", e);
@@ -129,17 +127,21 @@
     }
 
     @Override
-    public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
+    public void runPostOperationOnDataResource(URI uri, ObjectNode rootNode)
             throws RestconfException {
         ResourceData receivedData = convertJsonToDataNode(uri, rootNode);
-        ResourceData resourceData = getDataForStore(receivedData);
-        ResourceId rid = resourceData.resourceId();
-        List<DataNode> dataNodeList = resourceData.dataNodes();
-        // TODO: Error message needs to be fixed
+        ResourceId rid = receivedData.resourceId();
+        List<DataNode> dataNodeList = receivedData.dataNodes();
         if (dataNodeList.size() > 1) {
-            log.warn("ERROR: There are more than one Data Node can be proceed");
+            log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
         }
         DataNode dataNode = dataNodeList.get(0);
+
+        if (rid == null) {
+            rid = ResourceId.builder().addBranchPointSchema("/", null).build();
+            dataNode = removeTopNode(dataNode);
+        }
+
         try {
             dynamicConfigService.createNode(rid, dataNode);
         } catch (FailedException e) {
@@ -150,17 +152,28 @@
     }
 
     @Override
-    public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
-            throws RestconfException {
-        runPostOperationOnDataResource(uri, rootNode);
-    }
-
-    @Override
-    public void runDeleteOperationOnDataResource(String uri)
+    public void runPutOperationOnDataResource(URI uri, ObjectNode rootNode)
             throws RestconfException {
         ResourceId rid = convertUriToRid(uri);
+        ResourceData receivedData = convertJsonToDataNode(rmLastPathSegment(uri), rootNode);
+        ResourceId parentRid = receivedData.resourceId();
+        List<DataNode> dataNodeList = receivedData.dataNodes();
+        if (dataNodeList.size() > 1) {
+            log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
+        }
+        DataNode dataNode = dataNodeList.get(0);
+
         try {
-            dynamicConfigService.deleteNode(rid);
+            /*
+             * If the data node already exists, then replace it.
+             * Otherwise, create it.
+             */
+            if (dynamicConfigService.nodeExist(rid)) {
+                dynamicConfigService.replaceNode(parentRid, dataNode);
+            } else {
+                dynamicConfigService.createNode(parentRid, dataNode);
+            }
+
         } catch (FailedException e) {
             log.error("ERROR: DynamicConfigService: ", e);
             throw new RestconfException("ERROR: DynamicConfigService",
@@ -169,8 +182,51 @@
     }
 
     @Override
-    public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
+    public void runDeleteOperationOnDataResource(URI uri)
             throws RestconfException {
+        ResourceId rid = convertUriToRid(uri);
+        try {
+            if (dynamicConfigService.nodeExist(rid)) {
+                dynamicConfigService.deleteNode(rid);
+            }
+        } catch (FailedException e) {
+            log.error("ERROR: DynamicConfigService: ", e);
+            throw new RestconfException("ERROR: DynamicConfigService",
+                                        INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @Override
+    public void runPatchOperationOnDataResource(URI uri, ObjectNode rootNode)
+            throws RestconfException {
+        ResourceData receivedData = convertJsonToDataNode(rmLastPathSegment(uri), rootNode);
+        ResourceId rid = receivedData.resourceId();
+        List<DataNode> dataNodeList = receivedData.dataNodes();
+        if (dataNodeList.size() > 1) {
+            log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
+        }
+        DataNode dataNode = dataNodeList.get(0);
+
+        if (rid == null) {
+            rid = ResourceId.builder().addBranchPointSchema("/", null).build();
+            dataNode = removeTopNode(dataNode);
+        }
+
+        try {
+            dynamicConfigService.updateNode(rid, dataNode);
+        } catch (FailedException e) {
+            log.error("ERROR: DynamicConfigService: ", e);
+            throw new RestconfException("ERROR: DynamicConfigService",
+                                        INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private DataNode removeTopNode(DataNode dataNode) {
+        if (dataNode instanceof InnerNode && dataNode.key().schemaId().name().equals("/")) {
+            Map.Entry<NodeKey, DataNode> entry = ((InnerNode) dataNode).childNodes().entrySet().iterator().next();
+            dataNode = entry.getValue();
+        }
+        return dataNode;
     }
 
     @Override
@@ -178,33 +234,48 @@
         return RESTCONF_ROOT;
     }
 
-    /**
-     * Creates a worker thread to listen to events and write to chunkedOutput.
-     * The worker thread blocks if no events arrive.
-     *
-     * @param streamId the RESTCONF stream id to which the client subscribes
-     * @param output   the string data stream
-     * @throws RestconfException if the worker thread fails to create
-     */
     @Override
-    public void subscribeEventStream(String streamId,
+    public void subscribeEventStream(String streamId, String clientIpAddr,
                                      ChunkedOutput<String> output)
             throws RestconfException {
-        if (workerThreadPool instanceof ThreadPoolExecutor) {
-            if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
-                    maxNumOfWorkerThreads) {
-                throw new RestconfException("no more work threads left to " +
-                                                    "handle event subscription",
-                                            INTERNAL_SERVER_ERROR);
-            }
-        } else {
-            throw new RestconfException("Server ERROR: workerThreadPool NOT " +
-                                                "instanceof ThreadPoolExecutor",
-                                        INTERNAL_SERVER_ERROR);
+        //TODO: to be completed
+    }
+
+    @Override
+    public CompletableFuture<RestconfRpcOutput> runRpc(URI uri,
+                                                       ObjectNode input,
+                                                       String clientIpAddress) {
+        CompletableFuture<RestconfRpcOutput> result =
+                CompletableFuture.supplyAsync(() -> executeRpc(uri, input, clientIpAddress));
+        return result;
+    }
+
+    private RestconfRpcOutput executeRpc(URI uri, ObjectNode input, String clientIpAddress) {
+        ResourceData rpcInputNode = convertJsonToDataNode(uri, input);
+        ResourceId resourceId = rpcInputNode.resourceId();
+        List<DataNode> inputDataNodeList = rpcInputNode.dataNodes();
+        DataNode inputDataNode = inputDataNodeList.get(0);
+        RpcInput rpcInput = new RpcInput(inputDataNode);
+
+        RestconfRpcOutput restconfOutput = null;
+        try {
+            CompletableFuture<RpcOutput> rpcFuture =
+                    dynamicConfigService.invokeRpc(resourceId, rpcInput);
+            RpcOutput rpcOutput = rpcFuture.get();
+            restconfOutput = RestconfUtils.convertRpcOutput(resourceId, rpcOutput);
+        } catch (InterruptedException e) {
+            log.error("ERROR: computeResultQ.take() has been interrupted.");
+            log.debug("executeRpc Exception:", e);
+            restconfOutput = new RestconfRpcOutput(INTERNAL_SERVER_ERROR, null);
+            restconfOutput.reason("RPC execution has been interrupted");
+        } catch (Exception e) {
+            log.error("ERROR: executeRpc: {}", e.getMessage());
+            log.debug("executeRpc Exception:", e);
+            restconfOutput = new RestconfRpcOutput(INTERNAL_SERVER_ERROR, null);
+            restconfOutput.reason(e.getMessage());
         }
 
-        BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
-        workerThreadPool.submit(new EventConsumer(output, eventQueue));
+        return restconfOutput;
     }
 
     private ResourceData getDataForStore(ResourceData resourceData) {
@@ -247,81 +318,4 @@
         resData.resourceId(parentId);
         return resData.build();
     }
-
-    /**
-     * Shutdown a pool cleanly if possible.
-     *
-     * @param pool an executorService
-     */
-    private void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
-                                           SECONDS)) {
-                    log.error("Pool did not terminate");
-                }
-            }
-        } catch (Exception ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    /**
-     * Implementation of a worker thread which reads data from a
-     * blocking queue and writes the data to a given chunk output stream.
-     * The thread is blocked when no data arrive to the queue and is
-     * terminated when the chunk output stream is closed (i.e., the
-     * HTTP-keep-alive session is closed).
-     */
-    private class EventConsumer implements Runnable {
-
-        private String queueId;
-        private final ChunkedOutput<String> output;
-        private final BlockingQueue<ObjectNode> bqueue;
-
-        public EventConsumer(ChunkedOutput<String> output,
-                             BlockingQueue<ObjectNode> q) {
-            this.output = output;
-            this.bqueue = q;
-        }
-
-        @Override
-        public void run() {
-            try {
-                queueId = String.valueOf(Thread.currentThread().getId());
-                eventQueueList.put(queueId, bqueue);
-                log.debug("EventConsumer thread created: {}", queueId);
-
-                ObjectNode chunk;
-                while ((chunk = bqueue.take()) != null) {
-                    output.write(chunk.toString().concat(EOL));
-                }
-            } catch (IOException e) {
-                log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
-                /*
-                 * Remove queue from the queue list, so that the event producer
-                 * (i.e., listener) would stop working.
-                 */
-                eventQueueList.remove(this.queueId);
-            } catch (InterruptedException e) {
-                log.error("ERROR: EventConsumer: bqueue.take() " +
-                                  "has been interrupted.");
-                log.debug("EventConsumer Exception:", e);
-            } finally {
-                try {
-                    output.close();
-                    log.debug("EventConsumer thread terminated: {}", queueId);
-                } catch (IOException e) {
-                    log.error("ERROR: EventConsumer: ", e);
-                }
-            }
-        }
-    }
 }