blob: 5d398c3aa080b98b1d63f13a85f260fb55212ba6 [file] [log] [blame]
Henry Yue20926e2016-08-25 22:58:02 -04001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.protocol.restconf.server.restconfmanager;
17
chengfanc58d4be2016-09-20 10:33:12 +080018import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.ArrayNode;
Henry Yue20926e2016-08-25 22:58:02 -040020import com.fasterxml.jackson.databind.node.ObjectNode;
21import com.google.common.util.concurrent.ThreadFactoryBuilder;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
chengfanc58d4be2016-09-20 10:33:12 +080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Henry Yue20926e2016-08-25 22:58:02 -040027import org.apache.felix.scr.annotations.Service;
28import org.glassfish.jersey.server.ChunkedOutput;
29import org.onosproject.event.ListenerTracker;
30import org.onosproject.protocol.restconf.server.api.RestconfException;
31import org.onosproject.protocol.restconf.server.api.RestconfService;
chengfanc58d4be2016-09-20 10:33:12 +080032import org.onosproject.yms.ydt.YdtBuilder;
33import org.onosproject.yms.ydt.YdtContext;
34import org.onosproject.yms.ydt.YdtContextOperationType;
35import org.onosproject.yms.ydt.YdtResponse;
36import org.onosproject.yms.ydt.YmsOperationExecutionStatus;
37import org.onosproject.yms.ydt.YmsOperationType;
38import org.onosproject.yms.ymsm.YmsService;
Henry Yu528007c2016-11-01 15:49:59 -040039import org.onosproject.yms.ynh.YangNotificationEvent;
40import org.onosproject.yms.ynh.YangNotificationListener;
41import org.onosproject.yms.ynh.YangNotificationService;
Henry Yue20926e2016-08-25 22:58:02 -040042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Henry Yue20926e2016-08-25 22:58:02 -040045import java.io.IOException;
Henry Yu528007c2016-11-01 15:49:59 -040046import java.util.Map.Entry;
Henry Yue20926e2016-08-25 22:58:02 -040047import java.util.concurrent.BlockingQueue;
48import java.util.concurrent.ConcurrentHashMap;
49import java.util.concurrent.ConcurrentMap;
50import java.util.concurrent.ExecutorService;
51import java.util.concurrent.Executors;
52import java.util.concurrent.LinkedBlockingQueue;
53import java.util.concurrent.ThreadPoolExecutor;
Henry Yue20926e2016-08-25 22:58:02 -040054
Henry Yu528007c2016-11-01 15:49:59 -040055import static java.util.concurrent.TimeUnit.SECONDS;
56import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
57import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertJsonToYdt;
58import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertUriToYdt;
59import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.convertYdtToJson;
60import static org.onosproject.protocol.restconf.server.utils.parser.json.ParserUtils.getJsonNameFromYdtNode;
chengfanc58d4be2016-09-20 10:33:12 +080061import static org.onosproject.yms.ydt.YdtContextOperationType.CREATE;
62import static org.onosproject.yms.ydt.YdtContextOperationType.DELETE;
chengfanc58d4be2016-09-20 10:33:12 +080063import static org.onosproject.yms.ydt.YdtContextOperationType.MERGE;
Henry Yu528007c2016-11-01 15:49:59 -040064import static org.onosproject.yms.ydt.YdtContextOperationType.NONE;
65import static org.onosproject.yms.ydt.YdtContextOperationType.REPLACE;
chengfanc58d4be2016-09-20 10:33:12 +080066import static org.onosproject.yms.ydt.YdtType.SINGLE_INSTANCE_LEAF_VALUE_NODE;
Henry Yu528007c2016-11-01 15:49:59 -040067import static org.onosproject.yms.ydt.YmsOperationExecutionStatus.EXECUTION_EXCEPTION;
chengfanc58d4be2016-09-20 10:33:12 +080068import static org.onosproject.yms.ydt.YmsOperationExecutionStatus.EXECUTION_SUCCESS;
Henry Yu528007c2016-11-01 15:49:59 -040069import static org.onosproject.yms.ydt.YmsOperationType.EDIT_CONFIG_REQUEST;
70import static org.onosproject.yms.ydt.YmsOperationType.QUERY_REQUEST;
71
Henry Yue20926e2016-08-25 22:58:02 -040072/*
73 * Skeletal ONOS RESTCONF Server application. The RESTCONF Manager
74 * implements the main logic of the RESTCONF Server.
75 *
76 * The design of the RESTCONF subsystem contains 2 major bundles:
77 *
chengfanc58d4be2016-09-20 10:33:12 +080078 * 1. RESTCONF Protocol Proxy (RPP). This bundle is implemented as a
79 * JAX-RS application. It acts as the frond-end of the RESTCONF server.
80 * It intercepts/handles HTTP requests that are sent to the RESTCONF
81 * Root Path. It then calls the RESTCONF Manager to process the requests.
Henry Yue20926e2016-08-25 22:58:02 -040082 *
chengfanc58d4be2016-09-20 10:33:12 +080083 * 2. RESTCONF Manager. This bundle module is the back-end of the server.
84 * It provides the main logic of the RESTCONF server. It interacts with
85 * the YMS (YANG Management System) to run operations on the YANG data
86 * objects (i.e., data resources).
Henry Yue20926e2016-08-25 22:58:02 -040087 */
88
89/**
90 * Implementation of the RestconfService interface. The class is designed
91 * as a Apache Flex component. Note that to avoid unnecessary
92 * activation, the @Component annotation's immediate parameter is set to false.
93 * So the component is not activated until a RESTCONF request is received by
94 * the RESTCONF Protocol Proxy (RPP) module, which consumes the service.
95 */
96@Component(immediate = false)
97@Service
98public class RestconfManager implements RestconfService {
99
100 private static final String RESTCONF_ROOT = "/onos/restconf";
101 private static final int THREAD_TERMINATION_TIMEOUT = 10;
Henry Yudc747af2016-11-16 13:29:54 -0500102
103 // Jersey's default chunk parser uses "\r\n" as the chunk separator.
104 private static final String EOL = "\r\n";
Henry Yue20926e2016-08-25 22:58:02 -0400105
106 private final int maxNumOfWorkerThreads = 5;
107
108 private final Logger log = LoggerFactory.getLogger(getClass());
109
chengfanc58d4be2016-09-20 10:33:12 +0800110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected YmsService ymsService;
Henry Yue20926e2016-08-25 22:58:02 -0400112
Henry Yu528007c2016-11-01 15:49:59 -0400113 protected YangNotificationService ymsNotificationService;
114
Henry Yue20926e2016-08-25 22:58:02 -0400115 private ListenerTracker listeners;
116
117 private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
chengfanc58d4be2016-09-20 10:33:12 +0800118 new ConcurrentHashMap<>();
Henry Yue20926e2016-08-25 22:58:02 -0400119
120 private ExecutorService workerThreadPool;
121
122 @Activate
123 protected void activate() {
chengfanc58d4be2016-09-20 10:33:12 +0800124 workerThreadPool = Executors
125 .newFixedThreadPool(maxNumOfWorkerThreads,
126 new ThreadFactoryBuilder()
127 .setNameFormat("restconf-worker")
128 .build());
Henry Yu528007c2016-11-01 15:49:59 -0400129 ymsNotificationService = ymsService.getYangNotificationService();
Henry Yue20926e2016-08-25 22:58:02 -0400130 listeners = new ListenerTracker();
Henry Yu528007c2016-11-01 15:49:59 -0400131 listeners.addListener(ymsNotificationService, new InternalYangNotificationListener());
Henry Yue20926e2016-08-25 22:58:02 -0400132 log.info("Started");
133 }
134
135 @Deactivate
136 protected void deactivate() {
137 listeners.removeListeners();
138 shutdownAndAwaitTermination(workerThreadPool);
139 log.info("Stopped");
140 }
141
142 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800143 public ObjectNode runGetOperationOnDataResource(String uri)
144 throws RestconfException {
145 YdtBuilder ydtBuilder = getYdtBuilder(QUERY_REQUEST);
146 //Convert the URI to ydtBuilder
147 convertUriToYdt(uri, ydtBuilder, NONE);
148 YdtResponse ydtResponse = ymsService.executeOperation(ydtBuilder);
149 YmsOperationExecutionStatus status = ydtResponse
150 .getYmsOperationResult();
151 if (status != EXECUTION_SUCCESS) {
152 throw new RestconfException("YMS GET operation failed",
153 INTERNAL_SERVER_ERROR);
154 }
155
156 YdtContext rootNode = ydtResponse.getRootNode();
157 YdtContext curNode = ydtBuilder.getCurNode();
158
Henry Yu528007c2016-11-01 15:49:59 -0400159 ObjectNode result = convertYdtToJson(getJsonNameFromYdtNode(curNode),
160 rootNode,
chengfanc58d4be2016-09-20 10:33:12 +0800161 ymsService.getYdtWalker());
162 //if the query URI contain a key, something like list=key
163 //here should only get get child with the specific key
164 YdtContext child = curNode.getFirstChild();
165 if (child != null &&
166 child.getYdtType() == SINGLE_INSTANCE_LEAF_VALUE_NODE) {
167
Henry Yu528007c2016-11-01 15:49:59 -0400168 ArrayNode jsonNode = (ArrayNode) result.get(getJsonNameFromYdtNode(curNode));
chengfanc58d4be2016-09-20 10:33:12 +0800169 for (JsonNode next : jsonNode) {
Henry Yu528007c2016-11-01 15:49:59 -0400170 if (next.findValue(getJsonNameFromYdtNode(child))
chengfanc58d4be2016-09-20 10:33:12 +0800171 .asText().equals(child.getValue())) {
172 return (ObjectNode) next;
173 }
174 }
175 throw new RestconfException(String.format("No content for %s = %s",
Henry Yu528007c2016-11-01 15:49:59 -0400176 getJsonNameFromYdtNode(child),
chengfanc58d4be2016-09-20 10:33:12 +0800177 child.getValue()),
Henry Yudc747af2016-11-16 13:29:54 -0500178 INTERNAL_SERVER_ERROR);
chengfanc58d4be2016-09-20 10:33:12 +0800179 }
180 return result;
181 }
182
183 private YmsOperationExecutionStatus
184 invokeYmsOp(String uri, ObjectNode rootNode,
185 YdtContextOperationType opType) {
186 YdtBuilder ydtBuilder = getYdtBuilder(EDIT_CONFIG_REQUEST);
187 //Convert the URI to ydtBuilder
188 convertUriToYdt(uri, ydtBuilder, opType);
189
190 //set default operation type for the payload node
191 ydtBuilder.setDefaultEditOperationType(opType);
192 //convert the payload json body to ydt
193 convertJsonToYdt(rootNode, ydtBuilder);
194
Henry Yu528007c2016-11-01 15:49:59 -0400195 YmsOperationExecutionStatus status = EXECUTION_EXCEPTION;
196
197 try {
198 status = ymsService.executeOperation(ydtBuilder).getYmsOperationResult();
199 } catch (Exception e) {
200 log.error("YMS operation failed: {}", e.getMessage());
201 log.debug("Exception in invokeYmsOp: ", e);
202 }
203
204 return status;
Henry Yue20926e2016-08-25 22:58:02 -0400205 }
206
207 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800208 public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
209 throws RestconfException {
210 YmsOperationExecutionStatus status =
211 invokeYmsOp(uri, rootNode, CREATE);
212
213 if (status != EXECUTION_SUCCESS) {
214 throw new RestconfException("YMS post operation failed.",
215 INTERNAL_SERVER_ERROR);
216 }
Henry Yue20926e2016-08-25 22:58:02 -0400217 }
218
219 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800220 public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
221 throws RestconfException {
222 YmsOperationExecutionStatus status =
223 invokeYmsOp(uri, rootNode, REPLACE);
224
225 if (status != EXECUTION_SUCCESS) {
226 throw new RestconfException("YMS put operation failed.",
227 INTERNAL_SERVER_ERROR);
228 }
Henry Yue20926e2016-08-25 22:58:02 -0400229 }
230
Henry Yue20926e2016-08-25 22:58:02 -0400231 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800232 public void runDeleteOperationOnDataResource(String uri)
233 throws RestconfException {
234 //Get a root ydtBuilder
235 YdtBuilder ydtBuilder = getYdtBuilder(EDIT_CONFIG_REQUEST);
236 //Convert the URI to ydtBuilder
237 convertUriToYdt(uri, ydtBuilder, DELETE);
238 //Execute the delete operation
239 YmsOperationExecutionStatus status = ymsService
240 .executeOperation(ydtBuilder)
241 .getYmsOperationResult();
242 if (status != EXECUTION_SUCCESS) {
243 throw new RestconfException("YMS delete operation failed.",
244 INTERNAL_SERVER_ERROR);
245 }
246 }
247
248 @Override
249 public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
250 throws RestconfException {
251 YmsOperationExecutionStatus status = invokeYmsOp(uri, rootNode, MERGE);
252
253 if (status != EXECUTION_SUCCESS) {
254 throw new RestconfException("YMS patch operation failed.",
255 INTERNAL_SERVER_ERROR);
256 }
Henry Yue20926e2016-08-25 22:58:02 -0400257 }
258
259 @Override
260 public String getRestconfRootPath() {
chengfanc58d4be2016-09-20 10:33:12 +0800261 return RESTCONF_ROOT;
Henry Yue20926e2016-08-25 22:58:02 -0400262 }
263
264 /**
265 * Creates a worker thread to listen to events and write to chunkedOutput.
266 * The worker thread blocks if no events arrive.
267 *
Henry Yudc747af2016-11-16 13:29:54 -0500268 * @param streamId the RESTCONF stream id to which the client subscribes
269 * @param output the string data stream
Henry Yue20926e2016-08-25 22:58:02 -0400270 * @throws RestconfException if the worker thread fails to create
271 */
272 @Override
chengfanc58d4be2016-09-20 10:33:12 +0800273 public void subscribeEventStream(String streamId,
274 ChunkedOutput<String> output)
275 throws RestconfException {
Henry Yue20926e2016-08-25 22:58:02 -0400276 if (workerThreadPool instanceof ThreadPoolExecutor) {
chengfanc58d4be2016-09-20 10:33:12 +0800277 if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
278 maxNumOfWorkerThreads) {
279 throw new RestconfException("no more work threads left to " +
280 "handle event subscription",
281 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400282 }
283 } else {
chengfanc58d4be2016-09-20 10:33:12 +0800284 throw new RestconfException("Server ERROR: workerThreadPool NOT " +
285 "instanceof ThreadPoolExecutor",
286 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400287
288 }
289
Henry Yudc747af2016-11-16 13:29:54 -0500290 BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
Henry Yue20926e2016-08-25 22:58:02 -0400291 workerThreadPool.submit(new EventConsumer(output, eventQueue));
292 }
293
Henry Yue20926e2016-08-25 22:58:02 -0400294 /**
295 * Shutdown a pool cleanly if possible.
296 *
297 * @param pool an executorService
298 */
299 private void shutdownAndAwaitTermination(ExecutorService pool) {
300 pool.shutdown(); // Disable new tasks from being submitted
301 try {
302 // Wait a while for existing tasks to terminate
chengfanc58d4be2016-09-20 10:33:12 +0800303 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
Henry Yue20926e2016-08-25 22:58:02 -0400304 pool.shutdownNow(); // Cancel currently executing tasks
305 // Wait a while for tasks to respond to being cancelled
chengfanc58d4be2016-09-20 10:33:12 +0800306 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
307 SECONDS)) {
Henry Yue20926e2016-08-25 22:58:02 -0400308 log.error("Pool did not terminate");
309 }
310 }
311 } catch (Exception ie) {
312 // (Re-)Cancel if current thread also interrupted
313 pool.shutdownNow();
314 // Preserve interrupt status
315 Thread.currentThread().interrupt();
316 }
317 }
318
Henry Yudc747af2016-11-16 13:29:54 -0500319 /**
320 * Implementation of a worker thread which reads data from a
321 * blocking queue and writes the data to a given chunk output stream.
322 * The thread is blocked when no data arrive to the queue and is
323 * terminated when the chunk output stream is closed (i.e., the
324 * HTTP-keep-alive session is closed).
325 */
Henry Yue20926e2016-08-25 22:58:02 -0400326 private class EventConsumer implements Runnable {
327
Henry Yudc747af2016-11-16 13:29:54 -0500328 private String queueId;
Henry Yue20926e2016-08-25 22:58:02 -0400329 private final ChunkedOutput<String> output;
330 private final BlockingQueue<ObjectNode> bqueue;
331
chengfanc58d4be2016-09-20 10:33:12 +0800332 public EventConsumer(ChunkedOutput<String> output,
333 BlockingQueue<ObjectNode> q) {
Henry Yue20926e2016-08-25 22:58:02 -0400334 this.output = output;
335 this.bqueue = q;
Henry Yue20926e2016-08-25 22:58:02 -0400336 }
337
338 @Override
339 public void run() {
340 try {
Henry Yudc747af2016-11-16 13:29:54 -0500341 queueId = String.valueOf(Thread.currentThread().getId());
342 eventQueueList.put(queueId, bqueue);
343 log.debug("EventConsumer thread created: {}", queueId);
344
Henry Yue20926e2016-08-25 22:58:02 -0400345 ObjectNode chunk;
346 while ((chunk = bqueue.take()) != null) {
347 output.write(chunk.toString().concat(EOL));
348 }
349 } catch (IOException e) {
350 log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
351 /*
352 * Remove queue from the queue list, so that the event producer
353 * (i.e., listener) would stop working.
354 */
355 eventQueueList.remove(this.queueId);
356 } catch (InterruptedException e) {
chengfanc58d4be2016-09-20 10:33:12 +0800357 log.error("ERROR: EventConsumer: bqueue.take() " +
358 "has been interrupted.");
Henry Yue20926e2016-08-25 22:58:02 -0400359 log.debug("EventConsumer Exception:", e);
360 } finally {
361 try {
362 output.close();
363 log.debug("EventConsumer thread terminated: {}", queueId);
364 } catch (IOException e) {
365 log.error("ERROR: EventConsumer: ", e);
366 }
367 }
368 }
Henry Yue20926e2016-08-25 22:58:02 -0400369 }
370
chengfanc58d4be2016-09-20 10:33:12 +0800371 private YdtBuilder getYdtBuilder(YmsOperationType ymsOperationType) {
372 return ymsService.getYdtBuilder(RESTCONF_ROOT, null, ymsOperationType);
373 }
374
Henry Yue20926e2016-08-25 22:58:02 -0400375 /**
376 * The listener class acts as the event producer for the event queues. The
377 * queues are created by the event consumer threads and are removed when the
378 * threads terminate.
379 */
Henry Yu528007c2016-11-01 15:49:59 -0400380 private class InternalYangNotificationListener implements YangNotificationListener {
Henry Yue20926e2016-08-25 22:58:02 -0400381
382 @Override
383 public void event(YangNotificationEvent event) {
384 if (event.type() != YangNotificationEvent.Type.YANG_NOTIFICATION) {
385 // For now, we only handle YANG notification events.
386 return;
387 }
388
389 if (eventQueueList.isEmpty()) {
Henry Yu528007c2016-11-01 15:49:59 -0400390 /*
Henry Yue20926e2016-08-25 22:58:02 -0400391 * There is no consumer waiting to consume, so don't have to
392 * produce this event.
Henry Yu528007c2016-11-01 15:49:59 -0400393 */
Henry Yudc747af2016-11-16 13:29:54 -0500394 log.debug("Q list is empty");
Henry Yue20926e2016-08-25 22:58:02 -0400395 return;
396 }
397
398 try {
Henry Yudc747af2016-11-16 13:29:54 -0500399 YdtContext ydtNode = event.subject().getNotificationRootContext();
400 ObjectNode jsonNode = convertYdtToJson(getJsonNameFromYdtNode(ydtNode),
401 ydtNode,
402 ymsService.getYdtWalker());
Henry Yu528007c2016-11-01 15:49:59 -0400403 /*
Henry Yue20926e2016-08-25 22:58:02 -0400404 * Put the event to every queue out there. Each queue is
405 * corresponding to an event stream session. The queue is
406 * removed when the session terminates.
Henry Yu528007c2016-11-01 15:49:59 -0400407 */
Henry Yue20926e2016-08-25 22:58:02 -0400408 for (Entry<String, BlockingQueue<ObjectNode>> entry : eventQueueList
409 .entrySet()) {
Henry Yu528007c2016-11-01 15:49:59 -0400410 entry.getValue().put(jsonNode);
Henry Yue20926e2016-08-25 22:58:02 -0400411 }
412 } catch (InterruptedException e) {
Henry Yu528007c2016-11-01 15:49:59 -0400413 log.error("Failed to put event in queue: {}", e.getMessage());
414 log.debug("Exception trace in InternalYangNotificationListener: ", e);
415 throw new RestconfException("Failed to put event in queue",
416 INTERNAL_SERVER_ERROR);
Henry Yue20926e2016-08-25 22:58:02 -0400417 }
418 }
Henry Yu528007c2016-11-01 15:49:59 -0400419 }
Henry Yue20926e2016-08-25 22:58:02 -0400420}