blob: 6397cba9d9838453f018550e7b8be9b4136c4a3b [file] [log] [blame]
Jin Gan79f75372017-01-05 15:08:11 -08001/*
jingan7c5bf1f2017-02-09 02:58:09 -08002 * Copyright 2017-present Open Networking Laboratory
Jin Gan79f75372017-01-05 15:08:11 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
jingan7c5bf1f2017-02-09 02:58:09 -080016
Jin Gan79f75372017-01-05 15:08:11 -080017package org.onosproject.restconf.restconfmanager;
18
19import com.fasterxml.jackson.databind.node.ObjectNode;
20import com.google.common.util.concurrent.ThreadFactoryBuilder;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
jingan7c5bf1f2017-02-09 02:58:09 -080024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Jin Gan79f75372017-01-05 15:08:11 -080026import org.apache.felix.scr.annotations.Service;
27import org.glassfish.jersey.server.ChunkedOutput;
jingan7c5bf1f2017-02-09 02:58:09 -080028import org.onlab.osgi.DefaultServiceDirectory;
Jin Gan79f75372017-01-05 15:08:11 -080029import org.onosproject.event.ListenerTracker;
Jin Gan79f75372017-01-05 15:08:11 -080030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import java.io.IOException;
jingan7c5bf1f2017-02-09 02:58:09 -080034import java.util.List;
Jin Gan79f75372017-01-05 15:08:11 -080035import java.util.concurrent.BlockingQueue;
36import java.util.concurrent.ConcurrentHashMap;
37import java.util.concurrent.ConcurrentMap;
38import java.util.concurrent.ExecutorService;
39import java.util.concurrent.Executors;
40import java.util.concurrent.LinkedBlockingQueue;
41import java.util.concurrent.ThreadPoolExecutor;
Jin Gan79f75372017-01-05 15:08:11 -080042import static java.util.concurrent.TimeUnit.SECONDS;
43import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
44
jingan7c5bf1f2017-02-09 02:58:09 -080045import org.onosproject.config.FailedException;
46import org.onosproject.config.DynamicConfigService;
47import org.onosproject.config.Filter;
48import org.onosproject.restconf.api.RestconfException;
49import org.onosproject.restconf.api.RestconfService;
50import org.onosproject.yang.model.ResourceData;
51import org.onosproject.yang.model.ResourceId;
52import org.onosproject.yang.model.DataNode;
53import static org.onosproject.restconf.utils.RestconfUtils.convertUriToRid;
54import static org.onosproject.restconf.utils.RestconfUtils.convertJsonToDataNode;
55import static org.onosproject.restconf.utils.RestconfUtils.convertDataNodeToJson;
56
Jin Gan79f75372017-01-05 15:08:11 -080057/*
jingan7c5bf1f2017-02-09 02:58:09 -080058 * ONOS RESTCONF application. The RESTCONF Manager
59 * implements the main logic of the RESTCONF application.
Jin Gan79f75372017-01-05 15:08:11 -080060 *
61 * The design of the RESTCONF subsystem contains 2 major bundles:
jingan7c5bf1f2017-02-09 02:58:09 -080062 * This bundle module is the back-end of the server.
Jin Gan79f75372017-01-05 15:08:11 -080063 * It provides the main logic of the RESTCONF server. It interacts with
jingan7c5bf1f2017-02-09 02:58:09 -080064 * the Dynamic Config Service and yang runtime service to run operations
65 * on the YANG data objects (i.e., resource id, yang data node).
Jin Gan79f75372017-01-05 15:08:11 -080066 */
67
68/**
69 * Implementation of the RestconfService interface. The class is designed
70 * as a Apache Flex component. Note that to avoid unnecessary
71 * activation, the @Component annotation's immediate parameter is set to false.
72 * So the component is not activated until a RESTCONF request is received by
73 * the RESTCONF Protocol Proxy (RPP) module, which consumes the service.
74 */
75@Component(immediate = false)
76@Service
77public class RestconfManager implements RestconfService {
78
79 private static final String RESTCONF_ROOT = "/onos/restconf";
80 private static final int THREAD_TERMINATION_TIMEOUT = 10;
81
82 // Jersey's default chunk parser uses "\r\n" as the chunk separator.
83 private static final String EOL = "\r\n";
84
85 private final int maxNumOfWorkerThreads = 5;
86
87 private final Logger log = LoggerFactory.getLogger(getClass());
88
jingan7c5bf1f2017-02-09 02:58:09 -080089 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90
91 private static final DynamicConfigService DYNAMIC_CONFIG_SERVICE =
92 DefaultServiceDirectory.getService(DynamicConfigService.class);
93
Jin Gan79f75372017-01-05 15:08:11 -080094 private ListenerTracker listeners;
95
96 private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
97 new ConcurrentHashMap<>();
98
99 private ExecutorService workerThreadPool;
100
101 @Activate
102 protected void activate() {
103 workerThreadPool = Executors
104 .newFixedThreadPool(maxNumOfWorkerThreads,
105 new ThreadFactoryBuilder()
106 .setNameFormat("restconf-worker")
107 .build());
108 log.info("Started");
109 }
110
111 @Deactivate
112 protected void deactivate() {
113 shutdownAndAwaitTermination(workerThreadPool);
114 log.info("Stopped");
115 }
116
117 @Override
118 public ObjectNode runGetOperationOnDataResource(String uri)
119 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800120 ResourceId rid = convertUriToRid(uri);
121 // TODO: define Filter (if there is any requirement).
122 Filter filter = new Filter();
123 DataNode dataNode;
124 try {
125 dataNode = DYNAMIC_CONFIG_SERVICE.readNode(rid, filter);
126 } catch (FailedException e) {
127 log.error("ERROR: DynamicConfigService: ", e);
128 throw new RestconfException("ERROR: DynamicConfigService",
129 INTERNAL_SERVER_ERROR);
130 }
131 ObjectNode rootNode = convertDataNodeToJson(rid, dataNode);
132 return rootNode;
Jin Gan79f75372017-01-05 15:08:11 -0800133 }
134
135 @Override
136 public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
137 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800138 ResourceData resourceData = convertJsonToDataNode(uri, rootNode);
139 ResourceId rid = resourceData.resourceId();
140 List<DataNode> dataNodeList = resourceData.dataNodes();
141 // TODO: Error message needs to be fixed
142 if (dataNodeList.size() > 1) {
143 log.warn("ERROR: There are more than one Data Node can be proceed");
144 }
145 DataNode dataNode = dataNodeList.get(0);
146 try {
147 DYNAMIC_CONFIG_SERVICE.createNode(rid, dataNode);
148 } catch (FailedException e) {
149 log.error("ERROR: DynamicConfigService: ", e);
150 throw new RestconfException("ERROR: DynamicConfigService",
151 INTERNAL_SERVER_ERROR);
152 }
Jin Gan79f75372017-01-05 15:08:11 -0800153 }
154
155 @Override
156 public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
157 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800158 runPostOperationOnDataResource(uri, rootNode);
Jin Gan79f75372017-01-05 15:08:11 -0800159 }
160
161 @Override
162 public void runDeleteOperationOnDataResource(String uri)
163 throws RestconfException {
jingan7c5bf1f2017-02-09 02:58:09 -0800164 ResourceId rid = convertUriToRid(uri);
165 try {
166 DYNAMIC_CONFIG_SERVICE.deleteNode(rid);
167 } catch (FailedException e) {
168 log.error("ERROR: DynamicConfigService: ", e);
169 throw new RestconfException("ERROR: DynamicConfigService",
170 INTERNAL_SERVER_ERROR);
171 }
Jin Gan79f75372017-01-05 15:08:11 -0800172 }
173
174 @Override
175 public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
176 throws RestconfException {
177 }
178
179 @Override
180 public String getRestconfRootPath() {
181 return RESTCONF_ROOT;
182 }
183
184 /**
185 * Creates a worker thread to listen to events and write to chunkedOutput.
186 * The worker thread blocks if no events arrive.
187 *
188 * @param streamId the RESTCONF stream id to which the client subscribes
189 * @param output the string data stream
190 * @throws RestconfException if the worker thread fails to create
191 */
192 @Override
193 public void subscribeEventStream(String streamId,
194 ChunkedOutput<String> output)
195 throws RestconfException {
196 if (workerThreadPool instanceof ThreadPoolExecutor) {
197 if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
198 maxNumOfWorkerThreads) {
199 throw new RestconfException("no more work threads left to " +
200 "handle event subscription",
201 INTERNAL_SERVER_ERROR);
202 }
203 } else {
204 throw new RestconfException("Server ERROR: workerThreadPool NOT " +
205 "instanceof ThreadPoolExecutor",
206 INTERNAL_SERVER_ERROR);
207
208 }
209
210 BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
211 workerThreadPool.submit(new EventConsumer(output, eventQueue));
212 }
213
214 /**
215 * Shutdown a pool cleanly if possible.
216 *
217 * @param pool an executorService
218 */
219 private void shutdownAndAwaitTermination(ExecutorService pool) {
220 pool.shutdown(); // Disable new tasks from being submitted
221 try {
222 // Wait a while for existing tasks to terminate
223 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
224 pool.shutdownNow(); // Cancel currently executing tasks
225 // Wait a while for tasks to respond to being cancelled
226 if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
227 SECONDS)) {
228 log.error("Pool did not terminate");
229 }
230 }
231 } catch (Exception ie) {
232 // (Re-)Cancel if current thread also interrupted
233 pool.shutdownNow();
234 // Preserve interrupt status
235 Thread.currentThread().interrupt();
236 }
237 }
238
239 /**
240 * Implementation of a worker thread which reads data from a
241 * blocking queue and writes the data to a given chunk output stream.
242 * The thread is blocked when no data arrive to the queue and is
243 * terminated when the chunk output stream is closed (i.e., the
244 * HTTP-keep-alive session is closed).
245 */
246 private class EventConsumer implements Runnable {
247
248 private String queueId;
249 private final ChunkedOutput<String> output;
250 private final BlockingQueue<ObjectNode> bqueue;
251
252 public EventConsumer(ChunkedOutput<String> output,
253 BlockingQueue<ObjectNode> q) {
254 this.output = output;
255 this.bqueue = q;
256 }
257
258 @Override
259 public void run() {
260 try {
261 queueId = String.valueOf(Thread.currentThread().getId());
262 eventQueueList.put(queueId, bqueue);
263 log.debug("EventConsumer thread created: {}", queueId);
264
265 ObjectNode chunk;
266 while ((chunk = bqueue.take()) != null) {
267 output.write(chunk.toString().concat(EOL));
268 }
269 } catch (IOException e) {
270 log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
271 /*
272 * Remove queue from the queue list, so that the event producer
273 * (i.e., listener) would stop working.
274 */
275 eventQueueList.remove(this.queueId);
276 } catch (InterruptedException e) {
277 log.error("ERROR: EventConsumer: bqueue.take() " +
278 "has been interrupted.");
279 log.debug("EventConsumer Exception:", e);
280 } finally {
281 try {
282 output.close();
283 log.debug("EventConsumer thread terminated: {}", queueId);
284 } catch (IOException e) {
285 log.error("ERROR: EventConsumer: ", e);
286 }
287 }
288 }
289 }
290}