blob: e041f1ca53d2e78c9d7ec4a79c84146f36acae30 [file] [log] [blame]
Henry Yue20926e2016-08-25 22:58:02 -04001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.protocol.restconf.server.restconfmanager;
17
chengfanc58d4be2016-09-20 10:33:12 +080018import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.ArrayNode;
Henry Yue20926e2016-08-25 22:58:02 -040020import com.fasterxml.jackson.databind.node.ObjectNode;
21import com.google.common.util.concurrent.ThreadFactoryBuilder;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
chengfanc58d4be2016-09-20 10:33:12 +080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Henry Yue20926e2016-08-25 22:58:02 -040027import org.apache.felix.scr.annotations.Service;
28import org.glassfish.jersey.server.ChunkedOutput;
29import org.onosproject.event.ListenerTracker;
30import org.onosproject.protocol.restconf.server.api.RestconfException;
31import org.onosproject.protocol.restconf.server.api.RestconfService;
chengfanc58d4be2016-09-20 10:33:12 +080032import org.onosproject.yms.ydt.YdtBuilder;
33import org.onosproject.yms.ydt.YdtContext;
34import org.onosproject.yms.ydt.YdtContextOperationType;
35import org.onosproject.yms.ydt.YdtResponse;
36import org.onosproject.yms.ydt.YmsOperationExecutionStatus;
37import org.onosproject.yms.ydt.YmsOperationType;
38import org.onosproject.yms.ymsm.YmsService;
Henry Yu528007c2016-11-01 15:49:59 -040039import org.onosproject.yms.ynh.YangNotificationEvent;
40import org.onosproject.yms.ynh.YangNotificationListener;
41import org.onosproject.yms.ynh.YangNotificationService;
Henry Yue20926e2016-08-25 22:58:02 -040042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Henry Yue20926e2016-08-25 22:58:02 -040045import java.io.IOException;
Henry Yu528007c2016-11-01 15:49:59 -040046import java.util.Map.Entry;
Henry Yue20926e2016-08-25 22:58:02 -040047import java.util.concurrent.BlockingQueue;
48import java.util.concurrent.ConcurrentHashMap;
49import java.util.concurrent.ConcurrentMap;
50import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
52import java.util.concurrent.LinkedBlockingQueue;
53import java.util.concurrent.ThreadPoolExecutor;
Henry Yue20926e2016-08-25 22:58:02 -040054
Henry Yu528007c2016-11-01 15:49:59 -040055import static java.util.concurrent.TimeUnit.SECONDS;
56import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
57import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertJsonToYdt;
58import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertUriToYdt;
59import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertYdtToJson;
60import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.getJsonNameFromYdtNode;
chengfanc58d4be2016-09-20 10:33:12 +080061import static org.onosproject.yms.ydt.YdtContextOperationType.CREATE;
62import static org.onosproject.yms.ydt.YdtContextOperationType.DELETE;
chengfanc58d4be2016-09-20 10:33:12 +080063import static org.onosproject.yms.ydt.YdtContextOperationType.MERGE;
Henry Yu528007c2016-11-01 15:49:59 -040064import static org.onosproject.yms.ydt.YdtContextOperationType.NONE;
65import static org.onosproject.yms.ydt.YdtContextOperationType.REPLACE;
chengfanc58d4be2016-09-20 10:33:12 +080066import static org.onosproject.yms.ydt.YdtType.SINGLE_INSTANCE_LEAF_VALUE_NODE;
Henry Yu528007c2016-11-01 15:49:59 -040067import static org.onosproject.yms.ydt.YmsOperationExecutionStatus.EXECUTION_EXCEPTION;
chengfanc58d4be2016-09-20 10:33:12 +080068import static org.onosproject.yms.ydt.YmsOperationExecutionStatus.EXECUTION_SUCCESS;
Henry Yu528007c2016-11-01 15:49:59 -040069import static org.onosproject.yms.ydt.YmsOperationType.EDIT_CONFIG_REQUEST;
70import static org.onosproject.yms.ydt.YmsOperationType.QUERY_REQUEST;
71
Henry Yue20926e2016-08-25 22:58:02 -040072/*
73 * Skeletal ONOS RESTCONF Server application. The RESTCONF Manager
74 * implements the main logic of the RESTCONF Server.
75 *
76 * The design of the RESTCONF subsystem contains 2 major bundles:
77 *
chengfanc58d4be2016-09-20 10:33:12 +080078 * 1. RESTCONF Protocol Proxy (RPP). This bundle is implemented as a
79 * JAX-RS application. It acts as the frond-end of the RESTCONF server.
80 * It intercepts/handles HTTP requests that are sent to the RESTCONF
81 * Root Path. It then calls the RESTCONF Manager to process the requests.
Henry Yue20926e2016-08-25 22:58:02 -040082 *
chengfanc58d4be2016-09-20 10:33:12 +080083 * 2. RESTCONF Manager. This bundle module is the back-end of the server.
84 * It provides the main logic of the RESTCONF server. It interacts with
85 * the YMS (YANG Management System) to run operations on the YANG data
86 * objects (i.e., data resources).
Henry Yuab25a712017-02-18 13:15:25 -050087 *
88 * NOTE: This implementation will be obsolete and will be replaced by the
89 * RESTCONF application.
Henry Yue20926e2016-08-25 22:58:02 -040090 */
91
92/**
93 * Implementation of the RestconfService interface. The class is designed
94 * as a Apache Flex component. Note that to avoid unnecessary
95 * activation, the @Component annotation's immediate parameter is set to false.
96 * So the component is not activated until a RESTCONF request is received by
97 * the RESTCONF Protocol Proxy (RPP) module, which consumes the service.
98 */
99@Component(immediate = false)
100@Service
101public class RestconfManager implements RestconfService {
102
103 private static final String RESTCONF_ROOT = "/onos/restconf";
104 private static final int THREAD_TERMINATION_TIMEOUT = 10;
Henry Yudc747af2016-11-16 13:29:54 -0500105
106 // Jersey's default chunk parser uses "\r\n" as the chunk separator.
107 private static final String EOL = "\r\n";
Henry Yue20926e2016-08-25 22:58:02 -0400108
109 private final int maxNumOfWorkerThreads = 5;
110
111 private final Logger log = LoggerFactory.getLogger(getClass());
112
chengfanc58d4be2016-09-20 10:33:12 +0800113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected YmsService ymsService;
Henry Yue20926e2016-08-25 22:58:02 -0400115
Henry Yu528007c2016-11-01 15:49:59 -0400116 protected YangNotificationService ymsNotificationService;
117
Henry Yue20926e2016-08-25 22:58:02 -0400118 private ListenerTracker listeners;
119
120 private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
chengfanc58d4be2016-09-20 10:33:12 +0800121 new ConcurrentHashMap<>();
Henry Yue20926e2016-08-25 22:58:02 -0400122
123 private ExecutorService workerThreadPool;
124
125 @Activate
126 protected void activate() {
chengfanc58d4be2016-09-20 10:33:12 +0800127 workerThreadPool = Executors
128 .newFixedThreadPool(maxNumOfWorkerThreads,
129 new ThreadFactoryBuilder()
130 .setNameFormat("restconf-worker")
131 .build());
Henry Yu528007c2016-11-01 15:49:59 -0400132 ymsNotificationService = ymsService.getYangNotificationService();
Henry Yue20926e2016-08-25 22:58:02 -0400133 listeners = new ListenerTracker();
Henry Yu528007c2016-11-01 15:49:59 -0400134 listeners.addListener(ymsNotificationService, new InternalYangNotificationListener());
Henry Yue20926e2016-08-25 22:58:02 -0400135 log.info("Started");
136 }
137
138 @Deactivate
139 protected void deactivate() {
140 listeners.removeListeners();
141 shutdownAndAwaitTermination(workerThreadPool);
142 log.info("Stopped");
143 }
144
145 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800146 public ObjectNode runGetOperationOnDataResource(String uri)
147 throws RestconfException {
148 YdtBuilder ydtBuilder = getYdtBuilder(QUERY_REQUEST);
149 //Convert the URI to ydtBuilder
150 convertUriToYdt(uri, ydtBuilder, NONE);
151 YdtResponse ydtResponse = ymsService.executeOperation(ydtBuilder);
152 YmsOperationExecutionStatus status = ydtResponse
153 .getYmsOperationResult();
154 if (status != EXECUTION_SUCCESS) {
155 throw new RestconfException("YMS GET operation failed",
156 INTERNAL_SERVER_ERROR);
157 }
158
159 YdtContext rootNode = ydtResponse.getRootNode();
160 YdtContext curNode = ydtBuilder.getCurNode();
161
Henry Yu528007c2016-11-01 15:49:59 -0400162 ObjectNode result = convertYdtToJson(getJsonNameFromYdtNode(curNode),
163 rootNode,
chengfanc58d4be2016-09-20 10:33:12 +0800164 ymsService.getYdtWalker());
165 //if the query URI contain a key, something like list=key
166 //here should only get get child with the specific key
167 YdtContext child = curNode.getFirstChild();
168 if (child != null &&
169 child.getYdtType() == SINGLE_INSTANCE_LEAF_VALUE_NODE) {
170
Henry Yu528007c2016-11-01 15:49:59 -0400171 ArrayNode jsonNode = (ArrayNode) result.get(getJsonNameFromYdtNode(curNode));
chengfanc58d4be2016-09-20 10:33:12 +0800172 for (JsonNode next : jsonNode) {
Henry Yu528007c2016-11-01 15:49:59 -0400173 if (next.findValue(getJsonNameFromYdtNode(child))
chengfanc58d4be2016-09-20 10:33:12 +0800174 .asText().equals(child.getValue())) {
175 return (ObjectNode) next;
176 }
177 }
178 throw new RestconfException(String.format("No content for %s = %s",
Henry Yu528007c2016-11-01 15:49:59 -0400179 getJsonNameFromYdtNode(child),
chengfanc58d4be2016-09-20 10:33:12 +0800180 child.getValue()),
Henry Yudc747af2016-11-16 13:29:54 -0500181 INTERNAL_SERVER_ERROR);
chengfanc58d4be2016-09-20 10:33:12 +0800182 }
183 return result;
184 }
185
186 private YmsOperationExecutionStatus
187 invokeYmsOp(String uri, ObjectNode rootNode,
188 YdtContextOperationType opType) {
189 YdtBuilder ydtBuilder = getYdtBuilder(EDIT_CONFIG_REQUEST);
190 //Convert the URI to ydtBuilder
191 convertUriToYdt(uri, ydtBuilder, opType);
192
193 //set default operation type for the payload node
194 ydtBuilder.setDefaultEditOperationType(opType);
195 //convert the payload json body to ydt
196 convertJsonToYdt(rootNode, ydtBuilder);
197
Henry Yu528007c2016-11-01 15:49:59 -0400198 YmsOperationExecutionStatus status = EXECUTION_EXCEPTION;
199
200 try {
201 status = ymsService.executeOperation(ydtBuilder).getYmsOperationResult();
202 } catch (Exception e) {
203 log.error("YMS operation failed: {}", e.getMessage());
204 log.debug("Exception in invokeYmsOp: ", e);
205 }
206
207 return status;
Henry Yue20926e2016-08-25 22:58:02 -0400208 }
209
210 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800211 public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
212 throws RestconfException {
213 YmsOperationExecutionStatus status =
214 invokeYmsOp(uri, rootNode, CREATE);
215
216 if (status != EXECUTION_SUCCESS) {
217 throw new RestconfException("YMS post operation failed.",
218 INTERNAL_SERVER_ERROR);
219 }
Henry Yue20926e2016-08-25 22:58:02 -0400220 }
221
222 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800223 public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
224 throws RestconfException {
225 YmsOperationExecutionStatus status =
226 invokeYmsOp(uri, rootNode, REPLACE);
227
228 if (status != EXECUTION_SUCCESS) {
229 throw new RestconfException("YMS put operation failed.",
230 INTERNAL_SERVER_ERROR);
231 }
Henry Yue20926e2016-08-25 22:58:02 -0400232 }
233
Henry Yue20926e2016-08-25 22:58:02 -0400234 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800235 public void runDeleteOperationOnDataResource(String uri)
236 throws RestconfException {
237 //Get a root ydtBuilder
238 YdtBuilder ydtBuilder = getYdtBuilder(EDIT_CONFIG_REQUEST);
239 //Convert the URI to ydtBuilder
240 convertUriToYdt(uri, ydtBuilder, DELETE);
241 //Execute the delete operation
242 YmsOperationExecutionStatus status = ymsService
243 .executeOperation(ydtBuilder)
244 .getYmsOperationResult();
245 if (status != EXECUTION_SUCCESS) {
246 throw new RestconfException("YMS delete operation failed.",
247 INTERNAL_SERVER_ERROR);
248 }
249 }
250
251 @Override
252 public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
253 throws RestconfException {
254 YmsOperationExecutionStatus status = invokeYmsOp(uri, rootNode, MERGE);
255
256 if (status != EXECUTION_SUCCESS) {
257 throw new RestconfException("YMS patch operation failed.",
258 INTERNAL_SERVER_ERROR);
259 }
Henry Yue20926e2016-08-25 22:58:02 -0400260 }
261
262 @Override
263 public String getRestconfRootPath() {
chengfanc58d4be2016-09-20 10:33:12 +0800264 return RESTCONF_ROOT;
Henry Yue20926e2016-08-25 22:58:02 -0400265 }
266
267 /**
268 * Creates a worker thread to listen to events and write to chunkedOutput.
269 * The worker thread blocks if no events arrive.
270 *
Henry Yudc747af2016-11-16 13:29:54 -0500271 * @param streamId the RESTCONF stream id to which the client subscribes
272 * @param output the string data stream
Henry Yue20926e2016-08-25 22:58:02 -0400273 * @throws RestconfException if the worker thread fails to create
274 */
275 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800276 public void subscribeEventStream(String streamId,
277 ChunkedOutput<String> output)
278 throws RestconfException {
Henry Yue20926e2016-08-25 22:58:02 -0400279 if (workerThreadPool instanceof ThreadPoolExecutor) {
chengfanc58d4be2016-09-20 10:33:12 +0800280 if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
281 maxNumOfWorkerThreads) {
282 throw new RestconfException("no more work threads left to " +
283 "handle event subscription",
284 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400285 }
286 } else {
chengfanc58d4be2016-09-20 10:33:12 +0800287 throw new RestconfException("Server ERROR: workerThreadPool NOT " +
288 "instanceof ThreadPoolExecutor",
289 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400290
291 }
292
Henry Yudc747af2016-11-16 13:29:54 -0500293 BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
Henry Yue20926e2016-08-25 22:58:02 -0400294 workerThreadPool.submit(new EventConsumer(output, eventQueue));
295 }
296
Henry Yue20926e2016-08-25 22:58:02 -0400297 /**
298 * Shutdown a pool cleanly if possible.
299 *
300 * @param pool an executorService
301 */
302 private void shutdownAndAwaitTermination(ExecutorService pool) {
303 pool.shutdown(); // Disable new tasks from being submitted
304 try {
305 // Wait a while for existing tasks to terminate
chengfanc58d4be2016-09-20 10:33:12 +0800306 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
Henry Yue20926e2016-08-25 22:58:02 -0400307 pool.shutdownNow(); // Cancel currently executing tasks
308 // Wait a while for tasks to respond to being cancelled
chengfanc58d4be2016-09-20 10:33:12 +0800309 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
310 SECONDS)) {
Henry Yue20926e2016-08-25 22:58:02 -0400311 log.error("Pool did not terminate");
312 }
313 }
314 } catch (Exception ie) {
315 // (Re-)Cancel if current thread also interrupted
316 pool.shutdownNow();
317 // Preserve interrupt status
318 Thread.currentThread().interrupt();
319 }
320 }
321
Henry Yudc747af2016-11-16 13:29:54 -0500322 /**
323 * Implementation of a worker thread which reads data from a
324 * blocking queue and writes the data to a given chunk output stream.
325 * The thread is blocked when no data arrive to the queue and is
326 * terminated when the chunk output stream is closed (i.e., the
327 * HTTP-keep-alive session is closed).
328 */
Henry Yue20926e2016-08-25 22:58:02 -0400329 private class EventConsumer implements Runnable {
330
Henry Yudc747af2016-11-16 13:29:54 -0500331 private String queueId;
Henry Yue20926e2016-08-25 22:58:02 -0400332 private final ChunkedOutput<String> output;
333 private final BlockingQueue<ObjectNode> bqueue;
334
chengfanc58d4be2016-09-20 10:33:12 +0800335 public EventConsumer(ChunkedOutput<String> output,
336 BlockingQueue<ObjectNode> q) {
Henry Yue20926e2016-08-25 22:58:02 -0400337 this.output = output;
338 this.bqueue = q;
Henry Yue20926e2016-08-25 22:58:02 -0400339 }
340
341 @Override
342 public void run() {
343 try {
Henry Yudc747af2016-11-16 13:29:54 -0500344 queueId = String.valueOf(Thread.currentThread().getId());
345 eventQueueList.put(queueId, bqueue);
346 log.debug("EventConsumer thread created: {}", queueId);
347
Henry Yue20926e2016-08-25 22:58:02 -0400348 ObjectNode chunk;
349 while ((chunk = bqueue.take()) != null) {
350 output.write(chunk.toString().concat(EOL));
351 }
352 } catch (IOException e) {
353 log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
354 /*
355 * Remove queue from the queue list, so that the event producer
356 * (i.e., listener) would stop working.
357 */
358 eventQueueList.remove(this.queueId);
359 } catch (InterruptedException e) {
chengfanc58d4be2016-09-20 10:33:12 +0800360 log.error("ERROR: EventConsumer: bqueue.take() " +
361 "has been interrupted.");
Henry Yue20926e2016-08-25 22:58:02 -0400362 log.debug("EventConsumer Exception:", e);
363 } finally {
364 try {
365 output.close();
366 log.debug("EventConsumer thread terminated: {}", queueId);
367 } catch (IOException e) {
368 log.error("ERROR: EventConsumer: ", e);
369 }
370 }
371 }
Henry Yue20926e2016-08-25 22:58:02 -0400372 }
373
chengfanc58d4be2016-09-20 10:33:12 +0800374 private YdtBuilder getYdtBuilder(YmsOperationType ymsOperationType) {
375 return ymsService.getYdtBuilder(RESTCONF_ROOT, null, ymsOperationType);
376 }
377
Henry Yue20926e2016-08-25 22:58:02 -0400378 /**
379 * The listener class acts as the event producer for the event queues. The
380 * queues are created by the event consumer threads and are removed when the
381 * threads terminate.
382 */
Henry Yu528007c2016-11-01 15:49:59 -0400383 private class InternalYangNotificationListener implements YangNotificationListener {
Henry Yue20926e2016-08-25 22:58:02 -0400384
385 @Override
386 public void event(YangNotificationEvent event) {
387 if (event.type() != YangNotificationEvent.Type.YANG_NOTIFICATION) {
388 // For now, we only handle YANG notification events.
389 return;
390 }
391
392 if (eventQueueList.isEmpty()) {
Henry Yu528007c2016-11-01 15:49:59 -0400393 /*
Henry Yue20926e2016-08-25 22:58:02 -0400394 * There is no consumer waiting to consume, so don't have to
395 * produce this event.
Henry Yu528007c2016-11-01 15:49:59 -0400396 */
Henry Yudc747af2016-11-16 13:29:54 -0500397 log.debug("Q list is empty");
Henry Yue20926e2016-08-25 22:58:02 -0400398 return;
399 }
400
401 try {
Henry Yudc747af2016-11-16 13:29:54 -0500402 YdtContext ydtNode = event.subject().getNotificationRootContext();
403 ObjectNode jsonNode = convertYdtToJson(getJsonNameFromYdtNode(ydtNode),
404 ydtNode,
405 ymsService.getYdtWalker());
Henry Yu528007c2016-11-01 15:49:59 -0400406 /*
Henry Yue20926e2016-08-25 22:58:02 -0400407 * Put the event to every queue out there. Each queue is
408 * corresponding to an event stream session. The queue is
409 * removed when the session terminates.
Henry Yu528007c2016-11-01 15:49:59 -0400410 */
Henry Yue20926e2016-08-25 22:58:02 -0400411 for (Entry<String, BlockingQueue<ObjectNode>> entry : eventQueueList
412 .entrySet()) {
Henry Yu528007c2016-11-01 15:49:59 -0400413 entry.getValue().put(jsonNode);
Henry Yue20926e2016-08-25 22:58:02 -0400414 }
415 } catch (InterruptedException e) {
Henry Yu528007c2016-11-01 15:49:59 -0400416 log.error("Failed to put event in queue: {}", e.getMessage());
417 log.debug("Exception trace in InternalYangNotificationListener: ", e);
418 throw new RestconfException("Failed to put event in queue",
419 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400420 }
421 }
Henry Yu528007c2016-11-01 15:49:59 -0400422 }
Henry Yue20926e2016-08-25 22:58:02 -0400423}