[ONOS-6760][ONOS-6761] RESTCONF changes and fixes
* bug fixes for POST and PUT
* added support for PATCH
* added support for RPC
Change-Id: Ib03dd1e92d4e7231851340063eb23e4b3ad582cf
diff --git a/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
index 2d32379..ce01dcd 100644
--- a/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
+++ b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
@@ -29,34 +29,35 @@
import org.onosproject.config.FailedException;
import org.onosproject.config.Filter;
import org.onosproject.restconf.api.RestconfException;
+import org.onosproject.restconf.api.RestconfRpcOutput;
import org.onosproject.restconf.api.RestconfService;
+import org.onosproject.restconf.utils.RestconfUtils;
import org.onosproject.yang.model.DataNode;
+import org.onosproject.yang.model.DefaultResourceData;
import org.onosproject.yang.model.InnerNode;
import org.onosproject.yang.model.KeyLeaf;
import org.onosproject.yang.model.ListKey;
import org.onosproject.yang.model.NodeKey;
import org.onosproject.yang.model.ResourceData;
import org.onosproject.yang.model.ResourceId;
+import org.onosproject.yang.model.RpcInput;
+import org.onosproject.yang.model.RpcOutput;
import org.onosproject.yang.model.SchemaId;
-import org.onosproject.yang.model.DefaultResourceData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.net.URI;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.onosproject.restconf.utils.RestconfUtils.convertDataNodeToJson;
import static org.onosproject.restconf.utils.RestconfUtils.convertJsonToDataNode;
import static org.onosproject.restconf.utils.RestconfUtils.convertUriToRid;
+import static org.onosproject.restconf.utils.RestconfUtils.rmLastPathSegment;
import static org.onosproject.yang.model.DataNode.Type.MULTI_INSTANCE_NODE;
import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_LEAF_VALUE_NODE;
import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_NODE;
@@ -77,10 +78,6 @@
public class RestconfManager implements RestconfService {
private static final String RESTCONF_ROOT = "/onos/restconf";
- private static final int THREAD_TERMINATION_TIMEOUT = 10;
-
- // Jersey's default chunk parser uses "\r\n" as the chunk separator.
- private static final String EOL = "\r\n";
private final int maxNumOfWorkerThreads = 5;
@@ -89,9 +86,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DynamicConfigService dynamicConfigService;
- private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
- new ConcurrentHashMap<>();
-
private ExecutorService workerThreadPool;
@Activate
@@ -106,18 +100,22 @@
@Deactivate
protected void deactivate() {
- shutdownAndAwaitTermination(workerThreadPool);
+ workerThreadPool.shutdownNow();
log.info("Stopped");
}
@Override
- public ObjectNode runGetOperationOnDataResource(String uri)
+ public ObjectNode runGetOperationOnDataResource(URI uri)
throws RestconfException {
ResourceId rid = convertUriToRid(uri);
// TODO: define Filter (if there is any requirement).
Filter filter = new Filter();
DataNode dataNode;
+
try {
+ if (!dynamicConfigService.nodeExist(rid)) {
+ return null;
+ }
dataNode = dynamicConfigService.readNode(rid, filter);
} catch (FailedException e) {
log.error("ERROR: DynamicConfigService: ", e);
@@ -129,17 +127,21 @@
}
@Override
- public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
+ public void runPostOperationOnDataResource(URI uri, ObjectNode rootNode)
throws RestconfException {
ResourceData receivedData = convertJsonToDataNode(uri, rootNode);
- ResourceData resourceData = getDataForStore(receivedData);
- ResourceId rid = resourceData.resourceId();
- List<DataNode> dataNodeList = resourceData.dataNodes();
- // TODO: Error message needs to be fixed
+ ResourceId rid = receivedData.resourceId();
+ List<DataNode> dataNodeList = receivedData.dataNodes();
if (dataNodeList.size() > 1) {
- log.warn("ERROR: There are more than one Data Node can be proceed");
+ log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
}
DataNode dataNode = dataNodeList.get(0);
+
+ if (rid == null) {
+ rid = ResourceId.builder().addBranchPointSchema("/", null).build();
+ dataNode = removeTopNode(dataNode);
+ }
+
try {
dynamicConfigService.createNode(rid, dataNode);
} catch (FailedException e) {
@@ -150,17 +152,28 @@
}
@Override
- public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
- throws RestconfException {
- runPostOperationOnDataResource(uri, rootNode);
- }
-
- @Override
- public void runDeleteOperationOnDataResource(String uri)
+ public void runPutOperationOnDataResource(URI uri, ObjectNode rootNode)
throws RestconfException {
ResourceId rid = convertUriToRid(uri);
+ ResourceData receivedData = convertJsonToDataNode(rmLastPathSegment(uri), rootNode);
+ ResourceId parentRid = receivedData.resourceId();
+ List<DataNode> dataNodeList = receivedData.dataNodes();
+ if (dataNodeList.size() > 1) {
+ log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
+ }
+ DataNode dataNode = dataNodeList.get(0);
+
try {
- dynamicConfigService.deleteNode(rid);
+ /*
+ * If the data node already exists, then replace it.
+ * Otherwise, create it.
+ */
+ if (dynamicConfigService.nodeExist(rid)) {
+ dynamicConfigService.replaceNode(parentRid, dataNode);
+ } else {
+ dynamicConfigService.createNode(parentRid, dataNode);
+ }
+
} catch (FailedException e) {
log.error("ERROR: DynamicConfigService: ", e);
throw new RestconfException("ERROR: DynamicConfigService",
@@ -169,8 +182,51 @@
}
@Override
- public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
+ public void runDeleteOperationOnDataResource(URI uri)
throws RestconfException {
+ ResourceId rid = convertUriToRid(uri);
+ try {
+ if (dynamicConfigService.nodeExist(rid)) {
+ dynamicConfigService.deleteNode(rid);
+ }
+ } catch (FailedException e) {
+ log.error("ERROR: DynamicConfigService: ", e);
+ throw new RestconfException("ERROR: DynamicConfigService",
+ INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @Override
+ public void runPatchOperationOnDataResource(URI uri, ObjectNode rootNode)
+ throws RestconfException {
+ ResourceData receivedData = convertJsonToDataNode(rmLastPathSegment(uri), rootNode);
+ ResourceId rid = receivedData.resourceId();
+ List<DataNode> dataNodeList = receivedData.dataNodes();
+ if (dataNodeList.size() > 1) {
+ log.warn("There are more than one Data Node can be proceed: {}", dataNodeList.size());
+ }
+ DataNode dataNode = dataNodeList.get(0);
+
+ if (rid == null) {
+ rid = ResourceId.builder().addBranchPointSchema("/", null).build();
+ dataNode = removeTopNode(dataNode);
+ }
+
+ try {
+ dynamicConfigService.updateNode(rid, dataNode);
+ } catch (FailedException e) {
+ log.error("ERROR: DynamicConfigService: ", e);
+ throw new RestconfException("ERROR: DynamicConfigService",
+ INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private DataNode removeTopNode(DataNode dataNode) {
+ if (dataNode instanceof InnerNode && dataNode.key().schemaId().name().equals("/")) {
+ Map.Entry<NodeKey, DataNode> entry = ((InnerNode) dataNode).childNodes().entrySet().iterator().next();
+ dataNode = entry.getValue();
+ }
+ return dataNode;
}
@Override
@@ -178,33 +234,48 @@
return RESTCONF_ROOT;
}
- /**
- * Creates a worker thread to listen to events and write to chunkedOutput.
- * The worker thread blocks if no events arrive.
- *
- * @param streamId the RESTCONF stream id to which the client subscribes
- * @param output the string data stream
- * @throws RestconfException if the worker thread fails to create
- */
@Override
- public void subscribeEventStream(String streamId,
+ public void subscribeEventStream(String streamId, String clientIpAddr,
ChunkedOutput<String> output)
throws RestconfException {
- if (workerThreadPool instanceof ThreadPoolExecutor) {
- if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
- maxNumOfWorkerThreads) {
- throw new RestconfException("no more work threads left to " +
- "handle event subscription",
- INTERNAL_SERVER_ERROR);
- }
- } else {
- throw new RestconfException("Server ERROR: workerThreadPool NOT " +
- "instanceof ThreadPoolExecutor",
- INTERNAL_SERVER_ERROR);
+ //TODO: to be completed
+ }
+
+ @Override
+ public CompletableFuture<RestconfRpcOutput> runRpc(URI uri,
+ ObjectNode input,
+ String clientIpAddress) {
+ CompletableFuture<RestconfRpcOutput> result =
+ CompletableFuture.supplyAsync(() -> executeRpc(uri, input, clientIpAddress));
+ return result;
+ }
+
+ private RestconfRpcOutput executeRpc(URI uri, ObjectNode input, String clientIpAddress) {
+ ResourceData rpcInputNode = convertJsonToDataNode(uri, input);
+ ResourceId resourceId = rpcInputNode.resourceId();
+ List<DataNode> inputDataNodeList = rpcInputNode.dataNodes();
+ DataNode inputDataNode = inputDataNodeList.get(0);
+ RpcInput rpcInput = new RpcInput(inputDataNode);
+
+ RestconfRpcOutput restconfOutput = null;
+ try {
+ CompletableFuture<RpcOutput> rpcFuture =
+ dynamicConfigService.invokeRpc(resourceId, rpcInput);
+ RpcOutput rpcOutput = rpcFuture.get();
+ restconfOutput = RestconfUtils.convertRpcOutput(resourceId, rpcOutput);
+ } catch (InterruptedException e) {
+ log.error("ERROR: computeResultQ.take() has been interrupted.");
+ log.debug("executeRpc Exception:", e);
+ restconfOutput = new RestconfRpcOutput(INTERNAL_SERVER_ERROR, null);
+ restconfOutput.reason("RPC execution has been interrupted");
+ } catch (Exception e) {
+ log.error("ERROR: executeRpc: {}", e.getMessage());
+ log.debug("executeRpc Exception:", e);
+ restconfOutput = new RestconfRpcOutput(INTERNAL_SERVER_ERROR, null);
+ restconfOutput.reason(e.getMessage());
}
- BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
- workerThreadPool.submit(new EventConsumer(output, eventQueue));
+ return restconfOutput;
}
private ResourceData getDataForStore(ResourceData resourceData) {
@@ -247,81 +318,4 @@
resData.resourceId(parentId);
return resData.build();
}
-
- /**
- * Shutdown a pool cleanly if possible.
- *
- * @param pool an executorService
- */
- private void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
- SECONDS)) {
- log.error("Pool did not terminate");
- }
- }
- } catch (Exception ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Implementation of a worker thread which reads data from a
- * blocking queue and writes the data to a given chunk output stream.
- * The thread is blocked when no data arrive to the queue and is
- * terminated when the chunk output stream is closed (i.e., the
- * HTTP-keep-alive session is closed).
- */
- private class EventConsumer implements Runnable {
-
- private String queueId;
- private final ChunkedOutput<String> output;
- private final BlockingQueue<ObjectNode> bqueue;
-
- public EventConsumer(ChunkedOutput<String> output,
- BlockingQueue<ObjectNode> q) {
- this.output = output;
- this.bqueue = q;
- }
-
- @Override
- public void run() {
- try {
- queueId = String.valueOf(Thread.currentThread().getId());
- eventQueueList.put(queueId, bqueue);
- log.debug("EventConsumer thread created: {}", queueId);
-
- ObjectNode chunk;
- while ((chunk = bqueue.take()) != null) {
- output.write(chunk.toString().concat(EOL));
- }
- } catch (IOException e) {
- log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
- /*
- * Remove queue from the queue list, so that the event producer
- * (i.e., listener) would stop working.
- */
- eventQueueList.remove(this.queueId);
- } catch (InterruptedException e) {
- log.error("ERROR: EventConsumer: bqueue.take() " +
- "has been interrupted.");
- log.debug("EventConsumer Exception:", e);
- } finally {
- try {
- output.close();
- log.debug("EventConsumer thread terminated: {}", queueId);
- } catch (IOException e) {
- log.error("ERROR: EventConsumer: ", e);
- }
- }
- }
- }
}