blob: 256b3a30ad45910cc29d8358626cef8d7dfc2ae0 [file] [log] [blame]
Jin Gan79f75372017-01-05 15:08:11 -08001/*
jingan7c5bf1f2017-02-09 02:58:09 -08002 * Copyright 2017-present Open Networking Laboratory
Jin Gan79f75372017-01-05 15:08:11 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
jingan7c5bf1f2017-02-09 02:58:09 -080016
Jin Gan79f75372017-01-05 15:08:11 -080017package org.onosproject.restconf.restconfmanager;
18
19import com.fasterxml.jackson.databind.node.ObjectNode;
20import com.google.common.util.concurrent.ThreadFactoryBuilder;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
jingan7c5bf1f2017-02-09 02:58:09 -080024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Jin Gan79f75372017-01-05 15:08:11 -080026import org.apache.felix.scr.annotations.Service;
27import org.glassfish.jersey.server.ChunkedOutput;
Henry Yu14af7782017-03-09 19:33:36 -050028import org.onosproject.config.DynamicConfigService;
29import org.onosproject.config.FailedException;
30import org.onosproject.config.Filter;
31import org.onosproject.restconf.api.RestconfException;
32import org.onosproject.restconf.api.RestconfService;
33import org.onosproject.yang.model.DataNode;
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +053034import org.onosproject.yang.model.InnerNode;
35import org.onosproject.yang.model.KeyLeaf;
36import org.onosproject.yang.model.ListKey;
37import org.onosproject.yang.model.NodeKey;
Henry Yu14af7782017-03-09 19:33:36 -050038import org.onosproject.yang.model.ResourceData;
39import org.onosproject.yang.model.ResourceId;
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +053040import org.onosproject.yang.model.SchemaId;
41import org.onosproject.yang.runtime.DefaultResourceData;
Jin Gan79f75372017-01-05 15:08:11 -080042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
45import java.io.IOException;
jingan7c5bf1f2017-02-09 02:58:09 -080046import java.util.List;
Jin Gan79f75372017-01-05 15:08:11 -080047import java.util.concurrent.BlockingQueue;
48import java.util.concurrent.ConcurrentHashMap;
49import java.util.concurrent.ConcurrentMap;
50import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
52import java.util.concurrent.LinkedBlockingQueue;
53import java.util.concurrent.ThreadPoolExecutor;
Henry Yu14af7782017-03-09 19:33:36 -050054
Jin Gan79f75372017-01-05 15:08:11 -080055import static java.util.concurrent.TimeUnit.SECONDS;
56import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
jingan7c5bf1f2017-02-09 02:58:09 -080057import static org.onosproject.restconf.utils.RestconfUtils.convertDataNodeToJson;
Henry Yu14af7782017-03-09 19:33:36 -050058import static org.onosproject.restconf.utils.RestconfUtils.convertJsonToDataNode;
59import static org.onosproject.restconf.utils.RestconfUtils.convertUriToRid;
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +053060import static org.onosproject.yang.model.DataNode.Type.MULTI_INSTANCE_NODE;
61import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_LEAF_VALUE_NODE;
62import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_NODE;
jingan7c5bf1f2017-02-09 02:58:09 -080063
Jin Gan79f75372017-01-05 15:08:11 -080064/*
jingan7c5bf1f2017-02-09 02:58:09 -080065 * ONOS RESTCONF application. The RESTCONF Manager
66 * implements the main logic of the RESTCONF application.
Jin Gan79f75372017-01-05 15:08:11 -080067 *
68 * The design of the RESTCONF subsystem contains 2 major bundles:
jingan7c5bf1f2017-02-09 02:58:09 -080069 * This bundle module is the back-end of the server.
Jin Gan79f75372017-01-05 15:08:11 -080070 * It provides the main logic of the RESTCONF server. It interacts with
jingan7c5bf1f2017-02-09 02:58:09 -080071 * the Dynamic Config Service and yang runtime service to run operations
72 * on the YANG data objects (i.e., resource id, yang data node).
Jin Gan79f75372017-01-05 15:08:11 -080073 */
74
jingan8a773322017-03-21 16:12:48 -070075@Component(immediate = true)
Jin Gan79f75372017-01-05 15:08:11 -080076@Service
77public class RestconfManager implements RestconfService {
78
79 private static final String RESTCONF_ROOT = "/onos/restconf";
80 private static final int THREAD_TERMINATION_TIMEOUT = 10;
81
82 // Jersey's default chunk parser uses "\r\n" as the chunk separator.
83 private static final String EOL = "\r\n";
84
85 private final int maxNumOfWorkerThreads = 5;
86
87 private final Logger log = LoggerFactory.getLogger(getClass());
88
jingan7c5bf1f2017-02-09 02:58:09 -080089 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Henry Yu14af7782017-03-09 19:33:36 -050090 protected DynamicConfigService dynamicConfigService;
Jin Gan79f75372017-01-05 15:08:11 -080091
92 private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
93 new ConcurrentHashMap<>();
94
95 private ExecutorService workerThreadPool;
96
97 @Activate
98 protected void activate() {
99 workerThreadPool = Executors
100 .newFixedThreadPool(maxNumOfWorkerThreads,
101 new ThreadFactoryBuilder()
102 .setNameFormat("restconf-worker")
103 .build());
104 log.info("Started");
105 }
106
107 @Deactivate
108 protected void deactivate() {
109 shutdownAndAwaitTermination(workerThreadPool);
110 log.info("Stopped");
111 }
112
113 @Override
114 public ObjectNode runGetOperationOnDataResource(String uri)
115 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800116 ResourceId rid = convertUriToRid(uri);
117 // TODO: define Filter (if there is any requirement).
118 Filter filter = new Filter();
119 DataNode dataNode;
120 try {
Henry Yu14af7782017-03-09 19:33:36 -0500121 dataNode = dynamicConfigService.readNode(rid, filter);
jingan7c5bf1f2017-02-09 02:58:09 -0800122 } catch (FailedException e) {
123 log.error("ERROR: DynamicConfigService: ", e);
124 throw new RestconfException("ERROR: DynamicConfigService",
125 INTERNAL_SERVER_ERROR);
126 }
127 ObjectNode rootNode = convertDataNodeToJson(rid, dataNode);
128 return rootNode;
Jin Gan79f75372017-01-05 15:08:11 -0800129 }
130
131 @Override
132 public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
133 throws RestconfException {
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +0530134 ResourceData receivedData = convertJsonToDataNode(uri, rootNode);
135 ResourceData resourceData = getDataForStore(receivedData);
jingan7c5bf1f2017-02-09 02:58:09 -0800136 ResourceId rid = resourceData.resourceId();
137 List<DataNode> dataNodeList = resourceData.dataNodes();
138 // TODO: Error message needs to be fixed
139 if (dataNodeList.size() > 1) {
140 log.warn("ERROR: There are more than one Data Node can be proceed");
141 }
142 DataNode dataNode = dataNodeList.get(0);
143 try {
jingan364cec32017-03-10 12:29:11 -0800144 dynamicConfigService.createNodeRecursive(rid, dataNode);
jingan7c5bf1f2017-02-09 02:58:09 -0800145 } catch (FailedException e) {
146 log.error("ERROR: DynamicConfigService: ", e);
147 throw new RestconfException("ERROR: DynamicConfigService",
148 INTERNAL_SERVER_ERROR);
149 }
Jin Gan79f75372017-01-05 15:08:11 -0800150 }
151
152 @Override
153 public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
154 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800155 runPostOperationOnDataResource(uri, rootNode);
Jin Gan79f75372017-01-05 15:08:11 -0800156 }
157
158 @Override
159 public void runDeleteOperationOnDataResource(String uri)
160 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800161 ResourceId rid = convertUriToRid(uri);
162 try {
jingan364cec32017-03-10 12:29:11 -0800163 dynamicConfigService.deleteNodeRecursive(rid);
jingan7c5bf1f2017-02-09 02:58:09 -0800164 } catch (FailedException e) {
165 log.error("ERROR: DynamicConfigService: ", e);
166 throw new RestconfException("ERROR: DynamicConfigService",
167 INTERNAL_SERVER_ERROR);
168 }
Jin Gan79f75372017-01-05 15:08:11 -0800169 }
170
171 @Override
172 public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
173 throws RestconfException {
174 }
175
176 @Override
177 public String getRestconfRootPath() {
178 return RESTCONF_ROOT;
179 }
180
181 /**
182 * Creates a worker thread to listen to events and write to chunkedOutput.
183 * The worker thread blocks if no events arrive.
184 *
185 * @param streamId the RESTCONF stream id to which the client subscribes
186 * @param output the string data stream
187 * @throws RestconfException if the worker thread fails to create
188 */
189 @Override
190 public void subscribeEventStream(String streamId,
191 ChunkedOutput<String> output)
192 throws RestconfException {
193 if (workerThreadPool instanceof ThreadPoolExecutor) {
194 if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
195 maxNumOfWorkerThreads) {
196 throw new RestconfException("no more work threads left to " +
197 "handle event subscription",
198 INTERNAL_SERVER_ERROR);
199 }
200 } else {
201 throw new RestconfException("Server ERROR: workerThreadPool NOT " +
202 "instanceof ThreadPoolExecutor",
203 INTERNAL_SERVER_ERROR);
Jin Gan79f75372017-01-05 15:08:11 -0800204 }
205
206 BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
207 workerThreadPool.submit(new EventConsumer(output, eventQueue));
208 }
209
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +0530210 private ResourceData getDataForStore(ResourceData resourceData) {
211 List<DataNode> nodes = resourceData.dataNodes();
212 ResourceId rid = resourceData.resourceId();
213 DataNode.Builder dbr = null;
214 ResourceId parentId = null;
215 try {
216 NodeKey lastKey = rid.nodeKeys().get(rid.nodeKeys().size() - 1);
217 SchemaId sid = lastKey.schemaId();
218 if (lastKey instanceof ListKey) {
219 dbr = InnerNode.builder(
220 sid.name(), sid.namespace()).type(MULTI_INSTANCE_NODE);
221 for (KeyLeaf keyLeaf : ((ListKey) lastKey).keyLeafs()) {
222 Object val = keyLeaf.leafValue();
223 dbr = dbr.addKeyLeaf(keyLeaf.leafSchema().name(),
224 sid.namespace(), val);
225 dbr = dbr.createChildBuilder(keyLeaf.leafSchema().name(),
226 sid.namespace(), val)
227 .type(SINGLE_INSTANCE_LEAF_VALUE_NODE);
sonugupta-huawei6119ac72017-03-21 16:25:40 +0530228 //Exit for key leaf node
229 dbr = dbr.exitNode();
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +0530230 }
231 } else {
232 dbr = InnerNode.builder(
233 sid.name(), sid.namespace()).type(SINGLE_INSTANCE_NODE);
234 }
235 if (nodes != null && !nodes.isEmpty()) {
236 // adding the parent node for given list of nodes
237 for (DataNode node : nodes) {
238 dbr = ((InnerNode.Builder) dbr).addNode(node);
239 }
240 }
241 parentId = rid.copyBuilder().removeLastKey().build();
242 } catch (CloneNotSupportedException e) {
243 e.printStackTrace();
244 }
245 ResourceData.Builder resData = DefaultResourceData.builder();
246 resData.addDataNode(dbr.build());
247 resData.resourceId(parentId);
248 return resData.build();
249 }
250
Jin Gan79f75372017-01-05 15:08:11 -0800251 /**
252 * Shutdown a pool cleanly if possible.
253 *
254 * @param pool an executorService
255 */
256 private void shutdownAndAwaitTermination(ExecutorService pool) {
257 pool.shutdown(); // Disable new tasks from being submitted
258 try {
259 // Wait a while for existing tasks to terminate
260 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
261 pool.shutdownNow(); // Cancel currently executing tasks
262 // Wait a while for tasks to respond to being cancelled
263 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
264 SECONDS)) {
265 log.error("Pool did not terminate");
266 }
267 }
268 } catch (Exception ie) {
269 // (Re-)Cancel if current thread also interrupted
270 pool.shutdownNow();
271 // Preserve interrupt status
272 Thread.currentThread().interrupt();
273 }
274 }
275
276 /**
277 * Implementation of a worker thread which reads data from a
278 * blocking queue and writes the data to a given chunk output stream.
279 * The thread is blocked when no data arrive to the queue and is
280 * terminated when the chunk output stream is closed (i.e., the
281 * HTTP-keep-alive session is closed).
282 */
283 private class EventConsumer implements Runnable {
284
285 private String queueId;
286 private final ChunkedOutput<String> output;
287 private final BlockingQueue<ObjectNode> bqueue;
288
289 public EventConsumer(ChunkedOutput<String> output,
290 BlockingQueue<ObjectNode> q) {
291 this.output = output;
292 this.bqueue = q;
293 }
294
295 @Override
296 public void run() {
297 try {
298 queueId = String.valueOf(Thread.currentThread().getId());
299 eventQueueList.put(queueId, bqueue);
300 log.debug("EventConsumer thread created: {}", queueId);
301
302 ObjectNode chunk;
303 while ((chunk = bqueue.take()) != null) {
304 output.write(chunk.toString().concat(EOL));
305 }
306 } catch (IOException e) {
307 log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
308 /*
309 * Remove queue from the queue list, so that the event producer
310 * (i.e., listener) would stop working.
311 */
312 eventQueueList.remove(this.queueId);
313 } catch (InterruptedException e) {
314 log.error("ERROR: EventConsumer: bqueue.take() " +
315 "has been interrupted.");
316 log.debug("EventConsumer Exception:", e);
317 } finally {
318 try {
319 output.close();
320 log.debug("EventConsumer thread terminated: {}", queueId);
321 } catch (IOException e) {
322 log.error("ERROR: EventConsumer: ", e);
323 }
324 }
325 }
326 }
327}