[ONOS-6760][ONOS-6761] RESTCONF changes and fixes

* bug fixes for POST and PUT
* added support for PATCH
* added support for RPC

Change-Id: Ib03dd1e92d4e7231851340063eb23e4b3ad582cf
diff --git a/apps/restconf/api/src/main/java/org/onosproject/restconf/api/RestconfRpcOutput.java b/apps/restconf/api/src/main/java/org/onosproject/restconf/api/RestconfRpcOutput.java
new file mode 100644
index 0000000..01cccf4
--- /dev/null
+++ b/apps/restconf/api/src/main/java/org/onosproject/restconf/api/RestconfRpcOutput.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.restconf.api;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import javax.ws.rs.core.Response;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Output of a RESTCONF RPC.
+ */
+public class RestconfRpcOutput {
+    private Response.Status status;
+    private ObjectNode output;
+    private String reasonPhrase;
+
+
+    /**
+     * Default constructor.
+     */
+    public RestconfRpcOutput() {
+
+    }
+
+    /**
+     * Instantiates a new RPC output object.
+     *
+     * @param s RPC execution status
+     * @param d RPC output in JSON format
+     */
+    public RestconfRpcOutput(Response.Status s, ObjectNode d) {
+        this.status = s;
+        this.output = d;
+    }
+
+    /**
+     * Sets the RPC execution status.
+     *
+     * @param s status
+     */
+    public void status(Response.Status s) {
+        this.status = s;
+    }
+
+    /**
+     * Sets the failure reason message for the RPC execution.
+     *
+     * @param s failure reason
+     */
+    public void reason(String s) {
+        this.reasonPhrase = s;
+    }
+
+    /**
+     * Returns the failure reason.
+     *
+     * @return failure reason
+     */
+    public String reason() {
+        return this.reasonPhrase;
+    }
+
+    /**
+     * Sets the RPC output.
+     *
+     * @param d RPC output in JSON format
+     */
+    public void output(ObjectNode d) {
+        this.output = d;
+    }
+
+    /**
+     * Returns the RPC execution status.
+     *
+     * @return status
+     */
+    public Response.Status status() {
+        return this.status;
+    }
+
+    /**
+     * Returns the RPC output in JSON format.
+     *
+     * @return output
+     */
+    public ObjectNode output() {
+        return this.output;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("status", status)
+                .add("reason", reasonPhrase)
+                .add("output", output)
+                .toString();
+    }
+}
diff --git a/apps/restconf/api/src/main/java/org/onosproject/restconf/api/RestconfService.java b/apps/restconf/api/src/main/java/org/onosproject/restconf/api/RestconfService.java
index f44ed99..0cc00c6 100644
--- a/apps/restconf/api/src/main/java/org/onosproject/restconf/api/RestconfService.java
+++ b/apps/restconf/api/src/main/java/org/onosproject/restconf/api/RestconfService.java
@@ -18,6 +18,9 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.glassfish.jersey.server.ChunkedOutput;
 
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Abstraction of RESTCONF Server functionality according to the
  * RESTCONF RFC (no official RFC number yet).
@@ -37,7 +40,7 @@
      * @return JSON representation of the data resource
      * @throws RestconfException if the GET operation cannot be fulfilled
      */
-    ObjectNode runGetOperationOnDataResource(String uri)
+    ObjectNode runGetOperationOnDataResource(URI uri)
             throws RestconfException;
 
     /**
@@ -53,7 +56,7 @@
      * @param rootNode JSON representation of the data resource
      * @throws RestconfException if the POST operation cannot be fulfilled
      */
-    void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
+    void runPostOperationOnDataResource(URI uri, ObjectNode rootNode)
             throws RestconfException;
 
     /**
@@ -69,7 +72,7 @@
      * @param rootNode JSON representation of the data resource
      * @throws RestconfException if the PUT operation cannot be fulfilled
      */
-    void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
+    void runPutOperationOnDataResource(URI uri, ObjectNode rootNode)
             throws RestconfException;
 
     /**
@@ -82,7 +85,7 @@
      * @param uri URI of the data resource to be deleted
      * @throws RestconfException if the DELETE operation cannot be fulfilled
      */
-    void runDeleteOperationOnDataResource(String uri) throws RestconfException;
+    void runDeleteOperationOnDataResource(URI uri) throws RestconfException;
 
     /**
      * Processes a PATCH operation on a data resource. The target data
@@ -97,7 +100,7 @@
      * @param rootNode JSON representation of the data resource
      * @throws RestconfException if the PATCH operation cannot be fulfilled
      */
-    void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
+    void runPatchOperationOnDataResource(URI uri, ObjectNode rootNode)
             throws RestconfException;
 
     /**
@@ -121,10 +124,26 @@
      * exception, so that the caller may return it to the RESTCONF client
      * to display.
      *
-     * @param streamId ID of the RESTCONF stream to subscribe
-     * @param output   A string data stream
+     * @param streamId     ID of the RESTCONF stream to subscribe
+     * @param clientIpAddr IP address of the RESTCONF client sending the request
+     * @param output       A string data stream
      * @throws RestconfException if the Event Stream cannot be subscribed
      */
-    void subscribeEventStream(String streamId, ChunkedOutput<String> output)
+    void subscribeEventStream(String streamId,
+                              String clientIpAddr,
+                              ChunkedOutput<String> output)
             throws RestconfException;
+
+    /**
+     * Handles a RESTCONF RPC request. This function executes the RPC in
+     * the target application's context and returns the results as a Future.
+     *
+     * @param uri          URI corresponding to the Name of the RPC
+     * @param rpcInput     Input parameters
+     * @param clientIpAddr IP address of the RESTCONF client calling the RPC
+     * @return RPC output
+     */
+    CompletableFuture<RestconfRpcOutput> runRpc(URI uri,
+                                                ObjectNode rpcInput,
+                                                String clientIpAddr);
 }
diff --git a/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
index 2d32379..ce01dcd 100644
--- a/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
+++ b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
@@ -29,34 +29,35 @@
 import org.onosproject.config.FailedException;
 import org.onosproject.config.Filter;
 import org.onosproject.restconf.api.RestconfException;
+import org.onosproject.restconf.api.RestconfRpcOutput;
 import org.onosproject.restconf.api.RestconfService;
+import org.onosproject.restconf.utils.RestconfUtils;
 import org.onosproject.yang.model.DataNode;
+import org.onosproject.yang.model.DefaultResourceData;
 import org.onosproject.yang.model.InnerNode;
 import org.onosproject.yang.model.KeyLeaf;
 import org.onosproject.yang.model.ListKey;
 import org.onosproject.yang.model.NodeKey;
 import org.onosproject.yang.model.ResourceData;
 import org.onosproject.yang.model.ResourceId;
+import org.onosproject.yang.model.RpcInput;
+import org.onosproject.yang.model.RpcOutput;
 import org.onosproject.yang.model.SchemaId;
-import org.onosproject.yang.model.DefaultResourceData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.net.URI;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
 import static org.onosproject.restconf.utils.RestconfUtils.convertDataNodeToJson;
 import static org.onosproject.restconf.utils.RestconfUtils.convertJsonToDataNode;
 import static org.onosproject.restconf.utils.RestconfUtils.convertUriToRid;
+import static org.onosproject.restconf.utils.RestconfUtils.rmLastPathSegment;
 import static org.onosproject.yang.model.DataNode.Type.MULTI_INSTANCE_NODE;
 import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_LEAF_VALUE_NODE;
 import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_NODE;
@@ -77,10 +78,6 @@
 public class RestconfManager implements RestconfService {
 
     private static final String RESTCONF_ROOT = "/onos/restconf";
-    private static final int THREAD_TERMINATION_TIMEOUT = 10;
-
-    // Jersey's default chunk parser uses "\r\n" as the chunk separator.
-    private static final String EOL = "\r\n";
 
     private final int maxNumOfWorkerThreads = 5;
 
@@ -89,9 +86,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DynamicConfigService dynamicConfigService;
 
-    private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
-            new ConcurrentHashMap<>();
-
     private ExecutorService workerThreadPool;
 
     @Activate
@@ -106,18 +100,22 @@
 
     @Deactivate
     protected void deactivate() {
-        shutdownAndAwaitTermination(workerThreadPool);
+        workerThreadPool.shutdownNow();
         log.info("Stopped");
     }
 
     @Override
-    public ObjectNode runGetOperationOnDataResource(String uri)
+    public ObjectNode runGetOperationOnDataResource(URI uri)
             throws RestconfException {
         ResourceId rid = convertUriToRid(uri);
         // TODO: define Filter (if there is any requirement).
         Filter filter = new Filter();
         DataNode dataNode;
+
         try {
+            if (!dynamicConfigService.nodeExist(rid)) {
+                return null;
+            }
             dataNode = dynamicConfigService.readNode(rid, filter);
         } catch (FailedException e) {
             log.error("ERROR: DynamicConfigService: ", e);
@@ -129,17 +127,21 @@
     }
 
     @Override
-    public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
+    public void runPostOperationOnDataResource(URI uri, ObjectNode rootNode)
             throws RestconfException {
         ResourceData receivedData = convertJsonToDataNode(uri, rootNode);
-        ResourceData resourceData = getDataForStore(receivedData);
-        ResourceId rid = resourceData.resourceId();
-        List<DataNode> dataNodeList = resourceData.dataNodes();
-        // TODO: Error message needs to be fixed
+        ResourceId rid = receivedData.resourceId();
+        List<DataNode> dataNodeList = receivedData.dataNodes();
         if (dataNodeList.size() > 1) {
-            log.warn("ERROR: There are more than one Data Node can be proceed");
+            log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
         }
         DataNode dataNode = dataNodeList.get(0);
+
+        if (rid == null) {
+            rid = ResourceId.builder().addBranchPointSchema("/", null).build();
+            dataNode = removeTopNode(dataNode);
+        }
+
         try {
             dynamicConfigService.createNode(rid, dataNode);
         } catch (FailedException e) {
@@ -150,17 +152,28 @@
     }
 
     @Override
-    public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
-            throws RestconfException {
-        runPostOperationOnDataResource(uri, rootNode);
-    }
-
-    @Override
-    public void runDeleteOperationOnDataResource(String uri)
+    public void runPutOperationOnDataResource(URI uri, ObjectNode rootNode)
             throws RestconfException {
         ResourceId rid = convertUriToRid(uri);
+        ResourceData receivedData = convertJsonToDataNode(rmLastPathSegment(uri), rootNode);
+        ResourceId parentRid = receivedData.resourceId();
+        List<DataNode> dataNodeList = receivedData.dataNodes();
+        if (dataNodeList.size() > 1) {
+            log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
+        }
+        DataNode dataNode = dataNodeList.get(0);
+
         try {
-            dynamicConfigService.deleteNode(rid);
+            /*
+             * If the data node already exists, then replace it.
+             * Otherwise, create it.
+             */
+            if (dynamicConfigService.nodeExist(rid)) {
+                dynamicConfigService.replaceNode(parentRid, dataNode);
+            } else {
+                dynamicConfigService.createNode(parentRid, dataNode);
+            }
+
         } catch (FailedException e) {
             log.error("ERROR: DynamicConfigService: ", e);
             throw new RestconfException("ERROR: DynamicConfigService",
@@ -169,8 +182,51 @@
     }
 
     @Override
-    public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
+    public void runDeleteOperationOnDataResource(URI uri)
             throws RestconfException {
+        ResourceId rid = convertUriToRid(uri);
+        try {
+            if (dynamicConfigService.nodeExist(rid)) {
+                dynamicConfigService.deleteNode(rid);
+            }
+        } catch (FailedException e) {
+            log.error("ERROR: DynamicConfigService: ", e);
+            throw new RestconfException("ERROR: DynamicConfigService",
+                                        INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @Override
+    public void runPatchOperationOnDataResource(URI uri, ObjectNode rootNode)
+            throws RestconfException {
+        ResourceData receivedData = convertJsonToDataNode(rmLastPathSegment(uri), rootNode);
+        ResourceId rid = receivedData.resourceId();
+        List<DataNode> dataNodeList = receivedData.dataNodes();
+        if (dataNodeList.size() > 1) {
+            log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
+        }
+        DataNode dataNode = dataNodeList.get(0);
+
+        if (rid == null) {
+            rid = ResourceId.builder().addBranchPointSchema("/", null).build();
+            dataNode = removeTopNode(dataNode);
+        }
+
+        try {
+            dynamicConfigService.updateNode(rid, dataNode);
+        } catch (FailedException e) {
+            log.error("ERROR: DynamicConfigService: ", e);
+            throw new RestconfException("ERROR: DynamicConfigService",
+                                        INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private DataNode removeTopNode(DataNode dataNode) {
+        if (dataNode instanceof InnerNode && dataNode.key().schemaId().name().equals("/")) {
+            Map.Entry<NodeKey, DataNode> entry = ((InnerNode) dataNode).childNodes().entrySet().iterator().next();
+            dataNode = entry.getValue();
+        }
+        return dataNode;
     }
 
     @Override
@@ -178,33 +234,48 @@
         return RESTCONF_ROOT;
     }
 
-    /**
-     * Creates a worker thread to listen to events and write to chunkedOutput.
-     * The worker thread blocks if no events arrive.
-     *
-     * @param streamId the RESTCONF stream id to which the client subscribes
-     * @param output   the string data stream
-     * @throws RestconfException if the worker thread fails to create
-     */
     @Override
-    public void subscribeEventStream(String streamId,
+    public void subscribeEventStream(String streamId, String clientIpAddr,
                                      ChunkedOutput<String> output)
             throws RestconfException {
-        if (workerThreadPool instanceof ThreadPoolExecutor) {
-            if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
-                    maxNumOfWorkerThreads) {
-                throw new RestconfException("no more work threads left to " +
-                                                    "handle event subscription",
-                                            INTERNAL_SERVER_ERROR);
-            }
-        } else {
-            throw new RestconfException("Server ERROR: workerThreadPool NOT " +
-                                                "instanceof ThreadPoolExecutor",
-                                        INTERNAL_SERVER_ERROR);
+        //TODO: to be completed
+    }
+
+    @Override
+    public CompletableFuture<RestconfRpcOutput> runRpc(URI uri,
+                                                       ObjectNode input,
+                                                       String clientIpAddress) {
+        CompletableFuture<RestconfRpcOutput> result =
+                CompletableFuture.supplyAsync(() -> executeRpc(uri, input, clientIpAddress));
+        return result;
+    }
+
+    private RestconfRpcOutput executeRpc(URI uri, ObjectNode input, String clientIpAddress) {
+        ResourceData rpcInputNode = convertJsonToDataNode(uri, input);
+        ResourceId resourceId = rpcInputNode.resourceId();
+        List<DataNode> inputDataNodeList = rpcInputNode.dataNodes();
+        DataNode inputDataNode = inputDataNodeList.get(0);
+        RpcInput rpcInput = new RpcInput(inputDataNode);
+
+        RestconfRpcOutput restconfOutput = null;
+        try {
+            CompletableFuture<RpcOutput> rpcFuture =
+                    dynamicConfigService.invokeRpc(resourceId, rpcInput);
+            RpcOutput rpcOutput = rpcFuture.get();
+            restconfOutput = RestconfUtils.convertRpcOutput(resourceId, rpcOutput);
+        } catch (InterruptedException e) {
+            log.error("ERROR: computeResultQ.take() has been interrupted.");
+            log.debug("executeRpc Exception:", e);
+            restconfOutput = new RestconfRpcOutput(INTERNAL_SERVER_ERROR, null);
+            restconfOutput.reason("RPC execution has been interrupted");
+        } catch (Exception e) {
+            log.error("ERROR: executeRpc: {}", e.getMessage());
+            log.debug("executeRpc Exception:", e);
+            restconfOutput = new RestconfRpcOutput(INTERNAL_SERVER_ERROR, null);
+            restconfOutput.reason(e.getMessage());
         }
 
-        BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
-        workerThreadPool.submit(new EventConsumer(output, eventQueue));
+        return restconfOutput;
     }
 
     private ResourceData getDataForStore(ResourceData resourceData) {
@@ -247,81 +318,4 @@
         resData.resourceId(parentId);
         return resData.build();
     }
-
-    /**
-     * Shutdown a pool cleanly if possible.
-     *
-     * @param pool an executorService
-     */
-    private void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
-                                           SECONDS)) {
-                    log.error("Pool did not terminate");
-                }
-            }
-        } catch (Exception ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    /**
-     * Implementation of a worker thread which reads data from a
-     * blocking queue and writes the data to a given chunk output stream.
-     * The thread is blocked when no data arrive to the queue and is
-     * terminated when the chunk output stream is closed (i.e., the
-     * HTTP-keep-alive session is closed).
-     */
-    private class EventConsumer implements Runnable {
-
-        private String queueId;
-        private final ChunkedOutput<String> output;
-        private final BlockingQueue<ObjectNode> bqueue;
-
-        public EventConsumer(ChunkedOutput<String> output,
-                             BlockingQueue<ObjectNode> q) {
-            this.output = output;
-            this.bqueue = q;
-        }
-
-        @Override
-        public void run() {
-            try {
-                queueId = String.valueOf(Thread.currentThread().getId());
-                eventQueueList.put(queueId, bqueue);
-                log.debug("EventConsumer thread created: {}", queueId);
-
-                ObjectNode chunk;
-                while ((chunk = bqueue.take()) != null) {
-                    output.write(chunk.toString().concat(EOL));
-                }
-            } catch (IOException e) {
-                log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
-                /*
-                 * Remove queue from the queue list, so that the event producer
-                 * (i.e., listener) would stop working.
-                 */
-                eventQueueList.remove(this.queueId);
-            } catch (InterruptedException e) {
-                log.error("ERROR: EventConsumer: bqueue.take() " +
-                                  "has been interrupted.");
-                log.debug("EventConsumer Exception:", e);
-            } finally {
-                try {
-                    output.close();
-                    log.debug("EventConsumer thread terminated: {}", queueId);
-                } catch (IOException e) {
-                    log.error("ERROR: EventConsumer: ", e);
-                }
-            }
-        }
-    }
 }
diff --git a/apps/restconf/utils/src/main/java/org/onosproject/restconf/utils/RestconfUtils.java b/apps/restconf/utils/src/main/java/org/onosproject/restconf/utils/RestconfUtils.java
index 1f12398..01f5597 100644
--- a/apps/restconf/utils/src/main/java/org/onosproject/restconf/utils/RestconfUtils.java
+++ b/apps/restconf/utils/src/main/java/org/onosproject/restconf/utils/RestconfUtils.java
@@ -21,23 +21,35 @@
 import org.apache.commons.io.IOUtils;
 import org.onlab.osgi.DefaultServiceDirectory;
 import org.onosproject.restconf.api.RestconfException;
+import org.onosproject.restconf.api.RestconfRpcOutput;
 import org.onosproject.restconf.utils.exceptions.RestconfUtilsException;
 import org.onosproject.yang.model.DataNode;
+import org.onosproject.yang.model.DefaultResourceData;
 import org.onosproject.yang.model.ResourceData;
 import org.onosproject.yang.model.ResourceId;
+import org.onosproject.yang.model.RpcOutput;
 import org.onosproject.yang.runtime.CompositeData;
 import org.onosproject.yang.runtime.CompositeStream;
 import org.onosproject.yang.runtime.DefaultCompositeData;
 import org.onosproject.yang.runtime.DefaultCompositeStream;
-import org.onosproject.yang.model.DefaultResourceData;
 import org.onosproject.yang.runtime.DefaultRuntimeContext;
 import org.onosproject.yang.runtime.RuntimeContext;
 import org.onosproject.yang.runtime.YangRuntimeService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+import static javax.ws.rs.core.Response.Status.EXPECTATION_FAILED;
 import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+import static javax.ws.rs.core.Response.Status.NO_CONTENT;
+import static javax.ws.rs.core.Response.Status.OK;
+import static javax.ws.rs.core.Response.Status.REQUEST_TIMEOUT;
 
 /**
  * Utilities used by the RESTCONF app.
@@ -53,10 +65,13 @@
      * Data format required by YangRuntime Service.
      */
     private static final String JSON_FORMAT = "JSON";
+    private static final String SLASH = "/";
 
     private static final YangRuntimeService YANG_RUNTIME =
             DefaultServiceDirectory.getService(YangRuntimeService.class);
 
+    private static final Logger log = LoggerFactory.getLogger(RestconfUtils.class);
+
     /**
      * Converts an input stream to JSON objectNode.
      *
@@ -97,7 +112,7 @@
      * @param uri URI of the data resource
      * @return resource identifier
      */
-    public static ResourceId convertUriToRid(String uri) {
+    public static ResourceId convertUriToRid(URI uri) {
         ResourceData resourceData = convertJsonToDataNode(uri, null);
         return resourceData.resourceId();
     }
@@ -109,22 +124,44 @@
      * @param rootNode JSON representation of the data resource
      * @return represents type of node in data store
      */
-    public static ResourceData convertJsonToDataNode(String uri,
+    public static ResourceData convertJsonToDataNode(URI uri,
                                                      ObjectNode rootNode) {
         RuntimeContext.Builder runtimeContextBuilder = new DefaultRuntimeContext.Builder();
         runtimeContextBuilder.setDataFormat(JSON_FORMAT);
         RuntimeContext context = runtimeContextBuilder.build();
+        ResourceData resourceData = null;
         InputStream jsonData = null;
-        if (rootNode != null) {
-            jsonData = convertObjectNodeToInputStream(rootNode);
+        try {
+            if (rootNode != null) {
+                jsonData = convertObjectNodeToInputStream(rootNode);
+            }
+            String uriString = getRawUriPath(uri);
+
+            CompositeStream compositeStream = new DefaultCompositeStream(uriString, jsonData);
+            // CompositeStream --- YangRuntimeService ---> CompositeData.
+            CompositeData compositeData = YANG_RUNTIME.decode(compositeStream, context);
+            resourceData = compositeData.resourceData();
+        } catch (Exception ex) {
+            log.error("convertJsonToDataNode failure: {}", ex.getMessage());
+            log.debug("convertJsonToDataNode failure", ex);
         }
-        CompositeStream compositeStream = new DefaultCompositeStream(uri, jsonData);
-        // CompositeStream --- YangRuntimeService ---> CompositeData.
-        CompositeData compositeData = YANG_RUNTIME.decode(compositeStream, context);
-        ResourceData resourceData = compositeData.resourceData();
+        if (resourceData == null) {
+            throw new RestconfException("ERROR: JSON cannot be converted to DataNode",
+                                        INTERNAL_SERVER_ERROR);
+        }
         return resourceData;
     }
 
+    private static String getRawUriPath(URI uri) {
+        String path = uri.getRawPath();
+        if (path.equals("/restconf/data")) {
+            return null;
+        }
+
+
+        return path.replaceAll("^/restconf/data/", "").replaceAll("^/restconf/operations/", "");
+    }
+
     /**
      * Convert Resource Id and Data Node to Json ObjectNode.
      *
@@ -143,14 +180,84 @@
         DefaultCompositeData.Builder compositeDataBuilder = DefaultCompositeData.builder();
         compositeDataBuilder.resourceData(resourceData);
         CompositeData compositeData = compositeDataBuilder.build();
-        // CompositeData --- YangRuntimeService ---> CompositeStream.
-        CompositeStream compositeStream = YANG_RUNTIME.encode(compositeData, context);
-        InputStream inputStream = compositeStream.resourceData();
-        ObjectNode rootNode = convertInputStreamToObjectNode(inputStream);
+        ObjectNode rootNode = null;
+        try {
+            // CompositeData --- YangRuntimeService ---> CompositeStream.
+            CompositeStream compositeStream = YANG_RUNTIME.encode(compositeData, context);
+            InputStream inputStream = compositeStream.resourceData();
+            rootNode = convertInputStreamToObjectNode(inputStream);
+        } catch (Exception ex) {
+            log.error("convertInputStreamToObjectNode failure: {}", ex.getMessage());
+            log.debug("convertInputStreamToObjectNode failure", ex);
+        }
         if (rootNode == null) {
             throw new RestconfException("ERROR: InputStream can not be convert to ObjectNode",
                                         INTERNAL_SERVER_ERROR);
         }
         return rootNode;
     }
+
+    /**
+     * Removes the last path segment from the given URI. That is, returns
+     * the parent of the given URI.
+     *
+     * @param uri given URI
+     * @return parent URI
+     */
+    public static URI rmLastPathSegment(URI uri) {
+        if (uri == null) {
+            return null;
+        }
+
+        UriBuilder builder = UriBuilder.fromUri(uri);
+        String newPath = rmLastPathSegmentStr(uri.getRawPath());
+        builder.replacePath(newPath);
+
+        return builder.build();
+    }
+
+    private static String rmLastPathSegmentStr(String rawPath) {
+        if (rawPath == null) {
+            return null;
+        }
+        int pos = rawPath.lastIndexOf(SLASH);
+        if (pos <= 0) {
+            return null;
+        }
+
+        return rawPath.substring(0, pos);
+    }
+
+    /**
+     * Creates a RESTCONF RPC output object from a given YANG RPC output object.
+     *
+     * @param cmdId     resource ID of the RPC
+     * @param rpcOutput given RPC output in YANG format
+     * @return RPC output in RESTCONF format
+     */
+    public static RestconfRpcOutput convertRpcOutput(ResourceId cmdId, RpcOutput rpcOutput) {
+        RestconfRpcOutput restconfRpcOutput = new RestconfRpcOutput();
+
+        restconfRpcOutput.status(convertResponseStatus(rpcOutput.status()));
+        if (rpcOutput.data() != null) {
+            restconfRpcOutput.output(convertDataNodeToJson(cmdId, rpcOutput.data()));
+        }
+
+        return restconfRpcOutput;
+    }
+
+    private static Response.Status convertResponseStatus(RpcOutput.Status status) {
+        switch (status) {
+            case RPC_SUCCESS:
+                return OK;
+            case RPC_FAILURE:
+                return EXPECTATION_FAILED;
+            case RPC_NODATA:
+                return NO_CONTENT;
+            case RPC_TIMEOUT:
+                return REQUEST_TIMEOUT;
+            default:
+                return BAD_REQUEST;
+        }
+    }
 }
diff --git a/protocols/restconf/server/rpp/BUCK b/protocols/restconf/server/rpp/BUCK
index 6531f88..7eaf628 100644
--- a/protocols/restconf/server/rpp/BUCK
+++ b/protocols/restconf/server/rpp/BUCK
@@ -3,6 +3,7 @@
     '//lib:jersey-client',
     '//lib:jersey-server',
     '//lib:javax.ws.rs-api',
+    '//lib:servlet-api',
     '//utils/rest:onlab-rest',
     '//apps/restconf/api:onos-apps-restconf-api',
 ]
diff --git a/protocols/restconf/server/rpp/src/main/java/org/onosproject/protocol/restconf/server/rpp/RestconfWebResource.java b/protocols/restconf/server/rpp/src/main/java/org/onosproject/protocol/restconf/server/rpp/RestconfWebResource.java
index 4bd2168..1037567 100644
--- a/protocols/restconf/server/rpp/src/main/java/org/onosproject/protocol/restconf/server/rpp/RestconfWebResource.java
+++ b/protocols/restconf/server/rpp/src/main/java/org/onosproject/protocol/restconf/server/rpp/RestconfWebResource.java
@@ -22,9 +22,11 @@
 import org.onosproject.rest.AbstractWebResource;
 import org.onosproject.restconf.api.Patch;
 import org.onosproject.restconf.api.RestconfException;
+import org.onosproject.restconf.api.RestconfRpcOutput;
 import org.onosproject.restconf.api.RestconfService;
 import org.slf4j.Logger;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -39,9 +41,13 @@
 import javax.ws.rs.core.UriInfo;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
 
 import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
 import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static javax.ws.rs.core.Response.Status.OK;
 import static org.slf4j.LoggerFactory.getLogger;
 
 
@@ -62,6 +68,8 @@
     @Context
     UriInfo uriInfo;
 
+    private static final String NOT_EXIST = "Requested data resource does not exist";
+
     private final RestconfService service = get(RestconfService.class);
     private final Logger log = getLogger(getClass());
 
@@ -75,14 +83,19 @@
      * @return HTTP response
      */
     @GET
+    @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     @Path("data/{identifier : .+}")
     public Response handleGetRequest(@PathParam("identifier") String uriString) {
-
         log.debug("handleGetRequest: {}", uriString);
 
+        URI uri = uriInfo.getRequestUri();
+
         try {
-            ObjectNode node = service.runGetOperationOnDataResource(uriString);
+            ObjectNode node = service.runGetOperationOnDataResource(uri);
+            if (node == null) {
+                return Response.status(NOT_FOUND).entity(NOT_EXIST).build();
+            }
             return ok(node).build();
         } catch (RestconfException e) {
             log.error("ERROR: handleGetRequest: {}", e.getMessage());
@@ -102,15 +115,18 @@
      * calls ChunkedOutput.close() to disconnect the session and terminates itself.
      *
      * @param streamId Event stream ID
+     * @param request  RESTCONF client information from which the client IP
+     *                 address is retrieved
      * @return A string data stream over HTTP keep-alive session
      */
     @GET
     @Produces(MediaType.APPLICATION_JSON)
     @Path("streams/{streamId}")
-    public ChunkedOutput<String> handleNotificationRegistration(@PathParam("streamId") String streamId) {
+    public ChunkedOutput<String> handleNotificationRegistration(@PathParam("streamId") String streamId,
+                                                                @Context HttpServletRequest request) {
         final ChunkedOutput<String> output = new ChunkedOutput<>(String.class);
         try {
-            service.subscribeEventStream(streamId, output);
+            service.subscribeEventStream(streamId, request.getRemoteAddr(), output);
         } catch (RestconfException e) {
             log.error("ERROR: handleNotificationRegistration: {}", e.getMessage());
             log.debug("Exception in handleNotificationRegistration:", e);
@@ -125,12 +141,31 @@
     }
 
     /**
+     * Handles a RESTCONF POST operation against the entire data store. If the
+     * operation is successful, HTTP status code "201 Created" is returned
+     * and there is no response message-body. If the data resource already
+     * exists, then the HTTP status code "409 Conflict" is returned.
+     *
+     * @param stream Input JSON object
+     * @return HTTP response
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("data")
+    public Response handlePostDatastore(InputStream stream) {
+
+        log.debug("handlePostDatastore");
+        return handlePostRequest(null, stream);
+    }
+
+    /**
      * Handles a RESTCONF POST operation against a data resource. If the
      * operation is successful, HTTP status code "201 Created" is returned
      * and there is no response message-body. If the data resource already
      * exists, then the HTTP status code "409 Conflict" is returned.
      *
-     * @param uriString URI of the data resource
+     * @param uriString URI of the parent data resource
      * @param stream    Input JSON object
      * @return HTTP response
      */
@@ -140,13 +175,14 @@
     @Path("data/{identifier : .+}")
     public Response handlePostRequest(@PathParam("identifier") String uriString,
                                       InputStream stream) {
-
         log.debug("handlePostRequest: {}", uriString);
 
+        URI uri = uriInfo.getRequestUri();
+
         try {
             ObjectNode rootNode = (ObjectNode) mapper().readTree(stream);
 
-            service.runPostOperationOnDataResource(uriString, rootNode);
+            service.runPostOperationOnDataResource(uri, rootNode);
             return Response.created(uriInfo.getRequestUri()).build();
         } catch (JsonProcessingException e) {
             log.error("ERROR: handlePostRequest ", e);
@@ -162,6 +198,50 @@
     }
 
     /**
+     * Handles a RESTCONF RPC request. This function executes the RPC in
+     * the target application's context and returns the results as a Future.
+     *
+     * @param rpcName  Name of the RPC
+     * @param rpcInput Input parameters
+     * @param request  RESTCONF client information from which the client IP
+     *                 address is retrieved
+     * @return RPC output
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("operations/{rpc : .+}")
+    public Response handleRpcRequest(@PathParam("rpc") String rpcName,
+                                     InputStream rpcInput,
+                                     @Context HttpServletRequest request) {
+        URI uri = uriInfo.getRequestUri();
+        try {
+            ObjectNode inputNode = (ObjectNode) mapper().readTree(rpcInput);
+            CompletableFuture<RestconfRpcOutput> rpcFuture = service.runRpc(uri,
+                                                                            inputNode,
+                                                                            request.getRemoteAddr());
+            RestconfRpcOutput restconfRpcOutput;
+            restconfRpcOutput = rpcFuture.get();
+            if (restconfRpcOutput.status() != OK) {
+                return Response.status(restconfRpcOutput.status())
+                        .entity(restconfRpcOutput.reason()).build();
+            }
+            ObjectNode node = restconfRpcOutput.output();
+            return ok(node).build();
+        } catch (JsonProcessingException e) {
+            log.error("ERROR:  handleRpcRequest", e);
+            return Response.status(BAD_REQUEST).build();
+        } catch (RestconfException e) {
+            log.error("ERROR: handleRpcRequest: {}", e.getMessage());
+            log.debug("Exception in handleRpcRequest:", e);
+            return e.getResponse();
+        } catch (Exception e) {
+            log.error("ERROR: handleRpcRequest ", e);
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    /**
      * Handles a RESTCONF PUT operation against a data resource. If a new
      * resource is successfully created, then the HTTP status code "201 Created"
      * is returned. If an existing resource is modified, then the HTTP
@@ -181,13 +261,14 @@
     @Path("data/{identifier : .+}")
     public Response handlePutRequest(@PathParam("identifier") String uriString,
                                      InputStream stream) {
-
         log.debug("handlePutRequest: {}", uriString);
 
+        URI uri = uriInfo.getRequestUri();
+
         try {
             ObjectNode rootNode = (ObjectNode) mapper().readTree(stream);
 
-            service.runPutOperationOnDataResource(uriString, rootNode);
+            service.runPutOperationOnDataResource(uri, rootNode);
             return Response.created(uriInfo.getRequestUri()).build();
         } catch (JsonProcessingException e) {
             log.error("ERROR: handlePutRequest ", e);
@@ -213,14 +294,16 @@
      * @return HTTP response
      */
     @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     @Path("data/{identifier : .+}")
     public Response handleDeleteRequest(@PathParam("identifier") String uriString) {
-
         log.debug("handleDeleteRequest: {}", uriString);
 
+        URI uri = uriInfo.getRequestUri();
+
         try {
-            service.runDeleteOperationOnDataResource(uriString);
+            service.runDeleteOperationOnDataResource(uri);
             return Response.ok().build();
         } catch (RestconfException e) {
             log.error("ERROR: handleDeleteRequest: {}", e.getMessage());
@@ -235,7 +318,7 @@
      * there is a message-body, and "204 No Content" is returned if no
      * response message-body is sent.
      *
-     * @param uriString URI of the data resource
+     * @param uriString URI of the parent data resource
      * @param stream    Input JSON object
      * @return HTTP response
      */
@@ -248,10 +331,12 @@
 
         log.debug("handlePatchRequest: {}", uriString);
 
+        URI uri = uriInfo.getRequestUri();
+
         try {
             ObjectNode rootNode = (ObjectNode) mapper().readTree(stream);
 
-            service.runPatchOperationOnDataResource(uriString, rootNode);
+            service.runPatchOperationOnDataResource(uri, rootNode);
             return Response.ok().build();
         } catch (JsonProcessingException e) {
             log.error("ERROR: handlePatchRequest ", e);
@@ -266,4 +351,22 @@
         }
     }
 
+
+    /**
+     * Handles a RESTCONF PATCH operation against the entire data store.
+     * If the PATCH request succeeds, a "200 OK" status-line is returned if
+     * there is a message-body, and "204 No Content" is returned if no
+     * response message-body is sent.
+     *
+     * @param stream Input JSON object
+     * @return HTTP response
+     */
+    @Patch
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("data")
+    public Response handlePatchDatastore(InputStream stream) {
+        log.debug("handlePatchDatastore");
+        return handlePatchRequest(null, stream);
+    }
 }