blob: 90549934fe7a17759cc3ae9e793f226d1ed09789 [file] [log] [blame]
Jin Gan79f75372017-01-05 15:08:11 -08001/*
jingan7c5bf1f2017-02-09 02:58:09 -08002 * Copyright 2017-present Open Networking Laboratory
Jin Gan79f75372017-01-05 15:08:11 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
jingan7c5bf1f2017-02-09 02:58:09 -080016
Jin Gan79f75372017-01-05 15:08:11 -080017package org.onosproject.restconf.restconfmanager;
18
19import com.fasterxml.jackson.databind.node.ObjectNode;
20import com.google.common.util.concurrent.ThreadFactoryBuilder;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
jingan7c5bf1f2017-02-09 02:58:09 -080024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Jin Gan79f75372017-01-05 15:08:11 -080026import org.apache.felix.scr.annotations.Service;
27import org.glassfish.jersey.server.ChunkedOutput;
Henry Yu14af7782017-03-09 19:33:36 -050028import org.onosproject.config.DynamicConfigService;
29import org.onosproject.config.FailedException;
30import org.onosproject.config.Filter;
31import org.onosproject.restconf.api.RestconfException;
32import org.onosproject.restconf.api.RestconfService;
33import org.onosproject.yang.model.DataNode;
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +053034import org.onosproject.yang.model.InnerNode;
35import org.onosproject.yang.model.KeyLeaf;
36import org.onosproject.yang.model.ListKey;
37import org.onosproject.yang.model.NodeKey;
Henry Yu14af7782017-03-09 19:33:36 -050038import org.onosproject.yang.model.ResourceData;
39import org.onosproject.yang.model.ResourceId;
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +053040import org.onosproject.yang.model.SchemaId;
41import org.onosproject.yang.runtime.DefaultResourceData;
Jin Gan79f75372017-01-05 15:08:11 -080042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
45import java.io.IOException;
jingan7c5bf1f2017-02-09 02:58:09 -080046import java.util.List;
Jin Gan79f75372017-01-05 15:08:11 -080047import java.util.concurrent.BlockingQueue;
48import java.util.concurrent.ConcurrentHashMap;
49import java.util.concurrent.ConcurrentMap;
50import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
52import java.util.concurrent.LinkedBlockingQueue;
53import java.util.concurrent.ThreadPoolExecutor;
Henry Yu14af7782017-03-09 19:33:36 -050054
Jin Gan79f75372017-01-05 15:08:11 -080055import static java.util.concurrent.TimeUnit.SECONDS;
56import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
jingan7c5bf1f2017-02-09 02:58:09 -080057import static org.onosproject.restconf.utils.RestconfUtils.convertDataNodeToJson;
Henry Yu14af7782017-03-09 19:33:36 -050058import static org.onosproject.restconf.utils.RestconfUtils.convertJsonToDataNode;
59import static org.onosproject.restconf.utils.RestconfUtils.convertUriToRid;
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +053060import static org.onosproject.yang.model.DataNode.Type.MULTI_INSTANCE_NODE;
61import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_LEAF_VALUE_NODE;
62import static org.onosproject.yang.model.DataNode.Type.SINGLE_INSTANCE_NODE;
jingan7c5bf1f2017-02-09 02:58:09 -080063
Jin Gan79f75372017-01-05 15:08:11 -080064/*
jingan7c5bf1f2017-02-09 02:58:09 -080065 * ONOS RESTCONF application. The RESTCONF Manager
66 * implements the main logic of the RESTCONF application.
Jin Gan79f75372017-01-05 15:08:11 -080067 *
68 * The design of the RESTCONF subsystem contains 2 major bundles:
jingan7c5bf1f2017-02-09 02:58:09 -080069 * This bundle module is the back-end of the server.
Jin Gan79f75372017-01-05 15:08:11 -080070 * It provides the main logic of the RESTCONF server. It interacts with
jingan7c5bf1f2017-02-09 02:58:09 -080071 * the Dynamic Config Service and yang runtime service to run operations
72 * on the YANG data objects (i.e., resource id, yang data node).
Jin Gan79f75372017-01-05 15:08:11 -080073 */
74
75/**
76 * Implementation of the RestconfService interface. The class is designed
77 * as a Apache Flex component. Note that to avoid unnecessary
78 * activation, the @Component annotation's immediate parameter is set to false.
79 * So the component is not activated until a RESTCONF request is received by
80 * the RESTCONF Protocol Proxy (RPP) module, which consumes the service.
81 */
82@Component(immediate = false)
83@Service
84public class RestconfManager implements RestconfService {
85
86 private static final String RESTCONF_ROOT = "/onos/restconf";
87 private static final int THREAD_TERMINATION_TIMEOUT = 10;
88
89 // Jersey's default chunk parser uses "\r\n" as the chunk separator.
90 private static final String EOL = "\r\n";
91
92 private final int maxNumOfWorkerThreads = 5;
93
94 private final Logger log = LoggerFactory.getLogger(getClass());
95
jingan7c5bf1f2017-02-09 02:58:09 -080096 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Henry Yu14af7782017-03-09 19:33:36 -050097 protected DynamicConfigService dynamicConfigService;
Jin Gan79f75372017-01-05 15:08:11 -080098
99 private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
100 new ConcurrentHashMap<>();
101
102 private ExecutorService workerThreadPool;
103
104 @Activate
105 protected void activate() {
106 workerThreadPool = Executors
107 .newFixedThreadPool(maxNumOfWorkerThreads,
108 new ThreadFactoryBuilder()
109 .setNameFormat("restconf-worker")
110 .build());
111 log.info("Started");
112 }
113
114 @Deactivate
115 protected void deactivate() {
116 shutdownAndAwaitTermination(workerThreadPool);
117 log.info("Stopped");
118 }
119
120 @Override
121 public ObjectNode runGetOperationOnDataResource(String uri)
122 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800123 ResourceId rid = convertUriToRid(uri);
124 // TODO: define Filter (if there is any requirement).
125 Filter filter = new Filter();
126 DataNode dataNode;
127 try {
Henry Yu14af7782017-03-09 19:33:36 -0500128 dataNode = dynamicConfigService.readNode(rid, filter);
jingan7c5bf1f2017-02-09 02:58:09 -0800129 } catch (FailedException e) {
130 log.error("ERROR: DynamicConfigService: ", e);
131 throw new RestconfException("ERROR: DynamicConfigService",
132 INTERNAL_SERVER_ERROR);
133 }
134 ObjectNode rootNode = convertDataNodeToJson(rid, dataNode);
135 return rootNode;
Jin Gan79f75372017-01-05 15:08:11 -0800136 }
137
138 @Override
139 public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
140 throws RestconfException {
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +0530141 ResourceData receivedData = convertJsonToDataNode(uri, rootNode);
142 ResourceData resourceData = getDataForStore(receivedData);
jingan7c5bf1f2017-02-09 02:58:09 -0800143 ResourceId rid = resourceData.resourceId();
144 List<DataNode> dataNodeList = resourceData.dataNodes();
145 // TODO: Error message needs to be fixed
146 if (dataNodeList.size() > 1) {
147 log.warn("ERROR: There are more than one Data Node can be proceed");
148 }
149 DataNode dataNode = dataNodeList.get(0);
150 try {
jingan364cec32017-03-10 12:29:11 -0800151 dynamicConfigService.createNodeRecursive(rid, dataNode);
jingan7c5bf1f2017-02-09 02:58:09 -0800152 } catch (FailedException e) {
153 log.error("ERROR: DynamicConfigService: ", e);
154 throw new RestconfException("ERROR: DynamicConfigService",
155 INTERNAL_SERVER_ERROR);
156 }
Jin Gan79f75372017-01-05 15:08:11 -0800157 }
158
159 @Override
160 public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
161 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800162 runPostOperationOnDataResource(uri, rootNode);
Jin Gan79f75372017-01-05 15:08:11 -0800163 }
164
165 @Override
166 public void runDeleteOperationOnDataResource(String uri)
167 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800168 ResourceId rid = convertUriToRid(uri);
169 try {
jingan364cec32017-03-10 12:29:11 -0800170 dynamicConfigService.deleteNodeRecursive(rid);
jingan7c5bf1f2017-02-09 02:58:09 -0800171 } catch (FailedException e) {
172 log.error("ERROR: DynamicConfigService: ", e);
173 throw new RestconfException("ERROR: DynamicConfigService",
174 INTERNAL_SERVER_ERROR);
175 }
Jin Gan79f75372017-01-05 15:08:11 -0800176 }
177
178 @Override
179 public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
180 throws RestconfException {
181 }
182
183 @Override
184 public String getRestconfRootPath() {
185 return RESTCONF_ROOT;
186 }
187
188 /**
189 * Creates a worker thread to listen to events and write to chunkedOutput.
190 * The worker thread blocks if no events arrive.
191 *
192 * @param streamId the RESTCONF stream id to which the client subscribes
193 * @param output the string data stream
194 * @throws RestconfException if the worker thread fails to create
195 */
196 @Override
197 public void subscribeEventStream(String streamId,
198 ChunkedOutput<String> output)
199 throws RestconfException {
200 if (workerThreadPool instanceof ThreadPoolExecutor) {
201 if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
202 maxNumOfWorkerThreads) {
203 throw new RestconfException("no more work threads left to " +
204 "handle event subscription",
205 INTERNAL_SERVER_ERROR);
206 }
207 } else {
208 throw new RestconfException("Server ERROR: workerThreadPool NOT " +
209 "instanceof ThreadPoolExecutor",
210 INTERNAL_SERVER_ERROR);
Jin Gan79f75372017-01-05 15:08:11 -0800211 }
212
213 BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
214 workerThreadPool.submit(new EventConsumer(output, eventQueue));
215 }
216
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +0530217 private ResourceData getDataForStore(ResourceData resourceData) {
218 List<DataNode> nodes = resourceData.dataNodes();
219 ResourceId rid = resourceData.resourceId();
220 DataNode.Builder dbr = null;
221 ResourceId parentId = null;
222 try {
223 NodeKey lastKey = rid.nodeKeys().get(rid.nodeKeys().size() - 1);
224 SchemaId sid = lastKey.schemaId();
225 if (lastKey instanceof ListKey) {
226 dbr = InnerNode.builder(
227 sid.name(), sid.namespace()).type(MULTI_INSTANCE_NODE);
228 for (KeyLeaf keyLeaf : ((ListKey) lastKey).keyLeafs()) {
229 Object val = keyLeaf.leafValue();
230 dbr = dbr.addKeyLeaf(keyLeaf.leafSchema().name(),
231 sid.namespace(), val);
232 dbr = dbr.createChildBuilder(keyLeaf.leafSchema().name(),
233 sid.namespace(), val)
234 .type(SINGLE_INSTANCE_LEAF_VALUE_NODE);
sonugupta-huawei6119ac72017-03-21 16:25:40 +0530235 //Exit for key leaf node
236 dbr = dbr.exitNode();
sonugupta-huaweif0af7aa2017-03-17 00:54:52 +0530237 }
238 } else {
239 dbr = InnerNode.builder(
240 sid.name(), sid.namespace()).type(SINGLE_INSTANCE_NODE);
241 }
242 if (nodes != null && !nodes.isEmpty()) {
243 // adding the parent node for given list of nodes
244 for (DataNode node : nodes) {
245 dbr = ((InnerNode.Builder) dbr).addNode(node);
246 }
247 }
248 parentId = rid.copyBuilder().removeLastKey().build();
249 } catch (CloneNotSupportedException e) {
250 e.printStackTrace();
251 }
252 ResourceData.Builder resData = DefaultResourceData.builder();
253 resData.addDataNode(dbr.build());
254 resData.resourceId(parentId);
255 return resData.build();
256 }
257
Jin Gan79f75372017-01-05 15:08:11 -0800258 /**
259 * Shutdown a pool cleanly if possible.
260 *
261 * @param pool an executorService
262 */
263 private void shutdownAndAwaitTermination(ExecutorService pool) {
264 pool.shutdown(); // Disable new tasks from being submitted
265 try {
266 // Wait a while for existing tasks to terminate
267 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
268 pool.shutdownNow(); // Cancel currently executing tasks
269 // Wait a while for tasks to respond to being cancelled
270 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
271 SECONDS)) {
272 log.error("Pool did not terminate");
273 }
274 }
275 } catch (Exception ie) {
276 // (Re-)Cancel if current thread also interrupted
277 pool.shutdownNow();
278 // Preserve interrupt status
279 Thread.currentThread().interrupt();
280 }
281 }
282
283 /**
284 * Implementation of a worker thread which reads data from a
285 * blocking queue and writes the data to a given chunk output stream.
286 * The thread is blocked when no data arrive to the queue and is
287 * terminated when the chunk output stream is closed (i.e., the
288 * HTTP-keep-alive session is closed).
289 */
290 private class EventConsumer implements Runnable {
291
292 private String queueId;
293 private final ChunkedOutput<String> output;
294 private final BlockingQueue<ObjectNode> bqueue;
295
296 public EventConsumer(ChunkedOutput<String> output,
297 BlockingQueue<ObjectNode> q) {
298 this.output = output;
299 this.bqueue = q;
300 }
301
302 @Override
303 public void run() {
304 try {
305 queueId = String.valueOf(Thread.currentThread().getId());
306 eventQueueList.put(queueId, bqueue);
307 log.debug("EventConsumer thread created: {}", queueId);
308
309 ObjectNode chunk;
310 while ((chunk = bqueue.take()) != null) {
311 output.write(chunk.toString().concat(EOL));
312 }
313 } catch (IOException e) {
314 log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
315 /*
316 * Remove queue from the queue list, so that the event producer
317 * (i.e., listener) would stop working.
318 */
319 eventQueueList.remove(this.queueId);
320 } catch (InterruptedException e) {
321 log.error("ERROR: EventConsumer: bqueue.take() " +
322 "has been interrupted.");
323 log.debug("EventConsumer Exception:", e);
324 } finally {
325 try {
326 output.close();
327 log.debug("EventConsumer thread terminated: {}", queueId);
328 } catch (IOException e) {
329 log.error("ERROR: EventConsumer: ", e);
330 }
331 }
332 }
333 }
334}