blob: 5237f5041747a56b6ecfc5c00c01e4b9a50b03b0 [file] [log] [blame]
Henry Yue20926e2016-08-25 22:58:02 -04001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.protocol.restconf.server.restconfmanager;
17
chengfanc58d4be2016-09-20 10:33:12 +080018import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.ArrayNode;
Henry Yue20926e2016-08-25 22:58:02 -040020import com.fasterxml.jackson.databind.node.ObjectNode;
21import com.google.common.util.concurrent.ThreadFactoryBuilder;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
chengfanc58d4be2016-09-20 10:33:12 +080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Henry Yue20926e2016-08-25 22:58:02 -040027import org.apache.felix.scr.annotations.Service;
28import org.glassfish.jersey.server.ChunkedOutput;
29import org.onosproject.event.ListenerTracker;
30import org.onosproject.protocol.restconf.server.api.RestconfException;
31import org.onosproject.protocol.restconf.server.api.RestconfService;
chengfanc58d4be2016-09-20 10:33:12 +080032import org.onosproject.yms.ydt.YdtBuilder;
33import org.onosproject.yms.ydt.YdtContext;
34import org.onosproject.yms.ydt.YdtContextOperationType;
35import org.onosproject.yms.ydt.YdtResponse;
36import org.onosproject.yms.ydt.YmsOperationExecutionStatus;
37import org.onosproject.yms.ydt.YmsOperationType;
38import org.onosproject.yms.ymsm.YmsService;
Henry Yue20926e2016-08-25 22:58:02 -040039import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
Henry Yue20926e2016-08-25 22:58:02 -040042import java.io.IOException;
43import java.util.concurrent.BlockingQueue;
44import java.util.concurrent.ConcurrentHashMap;
45import java.util.concurrent.ConcurrentMap;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.LinkedBlockingQueue;
49import java.util.concurrent.ThreadPoolExecutor;
Henry Yue20926e2016-08-25 22:58:02 -040050
chengfanc58d4be2016-09-20 10:33:12 +080051
52import static org.onosproject.yms.ydt.YmsOperationType.QUERY_REQUEST;
53import static org.onosproject.yms.ydt.YmsOperationType.EDIT_CONFIG_REQUEST;
54import static org.onosproject.yms.ydt.YdtContextOperationType.NONE;
55import static org.onosproject.yms.ydt.YdtContextOperationType.CREATE;
56import static org.onosproject.yms.ydt.YdtContextOperationType.DELETE;
57import static org.onosproject.yms.ydt.YdtContextOperationType.REPLACE;
58import static org.onosproject.yms.ydt.YdtContextOperationType.MERGE;
59import static org.onosproject.yms.ydt.YdtType.SINGLE_INSTANCE_LEAF_VALUE_NODE;
60import static org.onosproject.yms.ydt.YmsOperationExecutionStatus.EXECUTION_SUCCESS;
61import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertYdtToJson;
62import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertUriToYdt;
63import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertJsonToYdt;
64import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
65import static java.util.concurrent.TimeUnit.SECONDS;
Henry Yue20926e2016-08-25 22:58:02 -040066/*
67 * Skeletal ONOS RESTCONF Server application. The RESTCONF Manager
68 * implements the main logic of the RESTCONF Server.
69 *
70 * The design of the RESTCONF subsystem contains 2 major bundles:
71 *
chengfanc58d4be2016-09-20 10:33:12 +080072 * 1. RESTCONF Protocol Proxy (RPP). This bundle is implemented as a
73 * JAX-RS application. It acts as the frond-end of the RESTCONF server.
74 * It intercepts/handles HTTP requests that are sent to the RESTCONF
75 * Root Path. It then calls the RESTCONF Manager to process the requests.
Henry Yue20926e2016-08-25 22:58:02 -040076 *
chengfanc58d4be2016-09-20 10:33:12 +080077 * 2. RESTCONF Manager. This bundle module is the back-end of the server.
78 * It provides the main logic of the RESTCONF server. It interacts with
79 * the YMS (YANG Management System) to run operations on the YANG data
80 * objects (i.e., data resources).
Henry Yue20926e2016-08-25 22:58:02 -040081 */
82
83/**
84 * Implementation of the RestconfService interface. The class is designed
85 * as a Apache Flex component. Note that to avoid unnecessary
86 * activation, the @Component annotation's immediate parameter is set to false.
87 * So the component is not activated until a RESTCONF request is received by
88 * the RESTCONF Protocol Proxy (RPP) module, which consumes the service.
89 */
90@Component(immediate = false)
91@Service
92public class RestconfManager implements RestconfService {
93
94 private static final String RESTCONF_ROOT = "/onos/restconf";
95 private static final int THREAD_TERMINATION_TIMEOUT = 10;
96 private static final String EOL = String.format("%n");
97
98 private final int maxNumOfWorkerThreads = 5;
99
100 private final Logger log = LoggerFactory.getLogger(getClass());
101
chengfanc58d4be2016-09-20 10:33:12 +0800102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected YmsService ymsService;
Henry Yue20926e2016-08-25 22:58:02 -0400104
105 private ListenerTracker listeners;
106
107 private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
chengfanc58d4be2016-09-20 10:33:12 +0800108 new ConcurrentHashMap<>();
Henry Yue20926e2016-08-25 22:58:02 -0400109
110 private ExecutorService workerThreadPool;
111
112 @Activate
113 protected void activate() {
chengfanc58d4be2016-09-20 10:33:12 +0800114 workerThreadPool = Executors
115 .newFixedThreadPool(maxNumOfWorkerThreads,
116 new ThreadFactoryBuilder()
117 .setNameFormat("restconf-worker")
118 .build());
Henry Yue20926e2016-08-25 22:58:02 -0400119 listeners = new ListenerTracker();
120 //TODO: YMS notification
Henry Yue20926e2016-08-25 22:58:02 -0400121 log.info("Started");
122 }
123
124 @Deactivate
125 protected void deactivate() {
126 listeners.removeListeners();
127 shutdownAndAwaitTermination(workerThreadPool);
128 log.info("Stopped");
129 }
130
131 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800132 public ObjectNode runGetOperationOnDataResource(String uri)
133 throws RestconfException {
134 YdtBuilder ydtBuilder = getYdtBuilder(QUERY_REQUEST);
135 //Convert the URI to ydtBuilder
136 convertUriToYdt(uri, ydtBuilder, NONE);
137 YdtResponse ydtResponse = ymsService.executeOperation(ydtBuilder);
138 YmsOperationExecutionStatus status = ydtResponse
139 .getYmsOperationResult();
140 if (status != EXECUTION_SUCCESS) {
141 throw new RestconfException("YMS GET operation failed",
142 INTERNAL_SERVER_ERROR);
143 }
144
145 YdtContext rootNode = ydtResponse.getRootNode();
146 YdtContext curNode = ydtBuilder.getCurNode();
147
148 ObjectNode result = convertYdtToJson(curNode.getName(), rootNode,
149 ymsService.getYdtWalker());
150 //if the query URI contain a key, something like list=key
151 //here should only get get child with the specific key
152 YdtContext child = curNode.getFirstChild();
153 if (child != null &&
154 child.getYdtType() == SINGLE_INSTANCE_LEAF_VALUE_NODE) {
155
156 ArrayNode jsonNode = (ArrayNode) result.get(curNode.getName());
157 for (JsonNode next : jsonNode) {
158 if (next.findValue(child.getName())
159 .asText().equals(child.getValue())) {
160 return (ObjectNode) next;
161 }
162 }
163 throw new RestconfException(String.format("No content for %s = %s",
164 child.getName(),
165 child.getValue()),
166 INTERNAL_SERVER_ERROR);
167 }
168 return result;
169 }
170
171 private YmsOperationExecutionStatus
172 invokeYmsOp(String uri, ObjectNode rootNode,
173 YdtContextOperationType opType) {
174 YdtBuilder ydtBuilder = getYdtBuilder(EDIT_CONFIG_REQUEST);
175 //Convert the URI to ydtBuilder
176 convertUriToYdt(uri, ydtBuilder, opType);
177
178 //set default operation type for the payload node
179 ydtBuilder.setDefaultEditOperationType(opType);
180 //convert the payload json body to ydt
181 convertJsonToYdt(rootNode, ydtBuilder);
182
183 return ymsService
184 .executeOperation(ydtBuilder)
185 .getYmsOperationResult();
Henry Yue20926e2016-08-25 22:58:02 -0400186 }
187
188 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800189 public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
190 throws RestconfException {
191 YmsOperationExecutionStatus status =
192 invokeYmsOp(uri, rootNode, CREATE);
193
194 if (status != EXECUTION_SUCCESS) {
195 throw new RestconfException("YMS post operation failed.",
196 INTERNAL_SERVER_ERROR);
197 }
Henry Yue20926e2016-08-25 22:58:02 -0400198 }
199
200 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800201 public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
202 throws RestconfException {
203 YmsOperationExecutionStatus status =
204 invokeYmsOp(uri, rootNode, REPLACE);
205
206 if (status != EXECUTION_SUCCESS) {
207 throw new RestconfException("YMS put operation failed.",
208 INTERNAL_SERVER_ERROR);
209 }
Henry Yue20926e2016-08-25 22:58:02 -0400210 }
211
Henry Yue20926e2016-08-25 22:58:02 -0400212 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800213 public void runDeleteOperationOnDataResource(String uri)
214 throws RestconfException {
215 //Get a root ydtBuilder
216 YdtBuilder ydtBuilder = getYdtBuilder(EDIT_CONFIG_REQUEST);
217 //Convert the URI to ydtBuilder
218 convertUriToYdt(uri, ydtBuilder, DELETE);
219 //Execute the delete operation
220 YmsOperationExecutionStatus status = ymsService
221 .executeOperation(ydtBuilder)
222 .getYmsOperationResult();
223 if (status != EXECUTION_SUCCESS) {
224 throw new RestconfException("YMS delete operation failed.",
225 INTERNAL_SERVER_ERROR);
226 }
227 }
228
229 @Override
230 public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
231 throws RestconfException {
232 YmsOperationExecutionStatus status = invokeYmsOp(uri, rootNode, MERGE);
233
234 if (status != EXECUTION_SUCCESS) {
235 throw new RestconfException("YMS patch operation failed.",
236 INTERNAL_SERVER_ERROR);
237 }
Henry Yue20926e2016-08-25 22:58:02 -0400238 }
239
240 @Override
241 public String getRestconfRootPath() {
chengfanc58d4be2016-09-20 10:33:12 +0800242 return RESTCONF_ROOT;
Henry Yue20926e2016-08-25 22:58:02 -0400243 }
244
245 /**
246 * Creates a worker thread to listen to events and write to chunkedOutput.
247 * The worker thread blocks if no events arrive.
248 *
249 * @param streamId ID of the RESTCONF stream to subscribe
250 * @param output A string data stream
251 * @throws RestconfException if the worker thread fails to create
252 */
253 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800254 public void subscribeEventStream(String streamId,
255 ChunkedOutput<String> output)
256 throws RestconfException {
257 BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
Henry Yue20926e2016-08-25 22:58:02 -0400258 if (workerThreadPool instanceof ThreadPoolExecutor) {
chengfanc58d4be2016-09-20 10:33:12 +0800259 if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
260 maxNumOfWorkerThreads) {
261 throw new RestconfException("no more work threads left to " +
262 "handle event subscription",
263 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400264 }
265 } else {
chengfanc58d4be2016-09-20 10:33:12 +0800266 throw new RestconfException("Server ERROR: workerThreadPool NOT " +
267 "instanceof ThreadPoolExecutor",
268 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400269
270 }
271
272 workerThreadPool.submit(new EventConsumer(output, eventQueue));
273 }
274
275
276 /**
277 * Shutdown a pool cleanly if possible.
278 *
279 * @param pool an executorService
280 */
281 private void shutdownAndAwaitTermination(ExecutorService pool) {
282 pool.shutdown(); // Disable new tasks from being submitted
283 try {
284 // Wait a while for existing tasks to terminate
chengfanc58d4be2016-09-20 10:33:12 +0800285 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
Henry Yue20926e2016-08-25 22:58:02 -0400286 pool.shutdownNow(); // Cancel currently executing tasks
287 // Wait a while for tasks to respond to being cancelled
chengfanc58d4be2016-09-20 10:33:12 +0800288 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
289 SECONDS)) {
Henry Yue20926e2016-08-25 22:58:02 -0400290 log.error("Pool did not terminate");
291 }
292 }
293 } catch (Exception ie) {
294 // (Re-)Cancel if current thread also interrupted
295 pool.shutdownNow();
296 // Preserve interrupt status
297 Thread.currentThread().interrupt();
298 }
299 }
300
301 private class EventConsumer implements Runnable {
302
303 private final String queueId;
304 private final ChunkedOutput<String> output;
305 private final BlockingQueue<ObjectNode> bqueue;
306
chengfanc58d4be2016-09-20 10:33:12 +0800307 public EventConsumer(ChunkedOutput<String> output,
308 BlockingQueue<ObjectNode> q) {
Henry Yue20926e2016-08-25 22:58:02 -0400309 this.queueId = Thread.currentThread().getName();
310 this.output = output;
311 this.bqueue = q;
312 eventQueueList.put(queueId, bqueue);
313 }
314
315 @Override
316 public void run() {
317 try {
318 ObjectNode chunk;
319 while ((chunk = bqueue.take()) != null) {
320 output.write(chunk.toString().concat(EOL));
321 }
322 } catch (IOException e) {
323 log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
324 /*
325 * Remove queue from the queue list, so that the event producer
326 * (i.e., listener) would stop working.
327 */
328 eventQueueList.remove(this.queueId);
329 } catch (InterruptedException e) {
chengfanc58d4be2016-09-20 10:33:12 +0800330 log.error("ERROR: EventConsumer: bqueue.take() " +
331 "has been interrupted.");
Henry Yue20926e2016-08-25 22:58:02 -0400332 log.debug("EventConsumer Exception:", e);
333 } finally {
334 try {
335 output.close();
336 log.debug("EventConsumer thread terminated: {}", queueId);
337 } catch (IOException e) {
338 log.error("ERROR: EventConsumer: ", e);
339 }
340 }
341 }
342
343 }
344
chengfanc58d4be2016-09-20 10:33:12 +0800345 private YdtBuilder getYdtBuilder(YmsOperationType ymsOperationType) {
346 return ymsService.getYdtBuilder(RESTCONF_ROOT, null, ymsOperationType);
347 }
348
Henry Yue20926e2016-08-25 22:58:02 -0400349 /**
350 * The listener class acts as the event producer for the event queues. The
351 * queues are created by the event consumer threads and are removed when the
352 * threads terminate.
353 */
354 //TODO: YMS notification
355 /*private class InternalYangNotificationListener implements YangNotificationListener {
356
357 @Override
358 public void event(YangNotificationEvent event) {
359 if (event.type() != YangNotificationEvent.Type.YANG_NOTIFICATION) {
360 // For now, we only handle YANG notification events.
361 return;
362 }
363
364 if (eventQueueList.isEmpty()) {
365 *//*
366 * There is no consumer waiting to consume, so don't have to
367 * produce this event.
368 *//*
369 return;
370 }
371
372 try {
373 *//*
374 * Put the event to every queue out there. Each queue is
375 * corresponding to an event stream session. The queue is
376 * removed when the session terminates.
377 *//*
378 for (Entry<String, BlockingQueue<ObjectNode>> entry : eventQueueList
379 .entrySet()) {
380 entry.getValue().put(event.subject().getData());
381 }
382 } catch (InterruptedException e) {
383 Log.error("ERROR", e);
384 throw new RestconfException("queue", Status.INTERNAL_SERVER_ERROR);
385 }
386 }
387
388 }*/
389}