ONOS-5755: RESTCONF App: App Skeleton creation

pom file will stay the same as before, while testing, please add <module>restconf</module> at apps/pom.xml

modified by Henry's comments.

Change-Id: I55d6bd4de07f03dcad77dfa575cb54c5563c937c
diff --git a/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
new file mode 100644
index 0000000..45c1a4c
--- /dev/null
+++ b/apps/restconf/restconfmgr/src/main/java/org/onosproject/restconf/restconfmanager/RestconfManager.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.restconf.restconfmanager;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.glassfish.jersey.server.ChunkedOutput;
+import org.onosproject.event.ListenerTracker;
+import org.onosproject.restconf.api.RestconfException;
+import org.onosproject.restconf.api.RestconfService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+
+/*
+ * Skeletal ONOS RESTCONF Server application. The RESTCONF Manager
+ * implements the main logic of the RESTCONF Server.
+ *
+ * The design of the RESTCONF subsystem contains 2 major bundles:
+ *
+ * 1. RESTCONF Protocol Proxy (RPP). This bundle is implemented as a
+ *    JAX-RS application. It acts as the frond-end of the RESTCONF server.
+ *    It intercepts/handles HTTP requests that are sent to the RESTCONF
+ *    Root Path. It then calls the RESTCONF Manager to process the requests.
+ *
+ * 2. RESTCONF Manager. This bundle module is the back-end of the server.
+ *    It provides the main logic of the RESTCONF server. It interacts with
+ *    the YMS (YANG Management System) to run operations on the YANG data
+ *    objects (i.e., data resources).
+ */
+
+/**
+ * Implementation of the RestconfService interface. The class is designed
+ * as a Apache Flex component. Note that to avoid unnecessary
+ * activation, the @Component annotation's immediate parameter is set to false.
+ * So the component is not activated until a RESTCONF request is received by
+ * the RESTCONF Protocol Proxy (RPP) module, which consumes the service.
+ */
+@Component(immediate = false)
+@Service
+public class RestconfManager implements RestconfService {
+
+    private static final String RESTCONF_ROOT = "/onos/restconf";
+    private static final int THREAD_TERMINATION_TIMEOUT = 10;
+
+    // Jersey's default chunk parser uses "\r\n" as the chunk separator.
+    private static final String EOL = "\r\n";
+
+    private final int maxNumOfWorkerThreads = 5;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private ListenerTracker listeners;
+
+    private ConcurrentMap<String, BlockingQueue<ObjectNode>> eventQueueList =
+            new ConcurrentHashMap<>();
+
+    private ExecutorService workerThreadPool;
+
+    @Activate
+    protected void activate() {
+        workerThreadPool = Executors
+                .newFixedThreadPool(maxNumOfWorkerThreads,
+                                    new ThreadFactoryBuilder()
+                                            .setNameFormat("restconf-worker")
+                                            .build());
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        shutdownAndAwaitTermination(workerThreadPool);
+        log.info("Stopped");
+    }
+
+    @Override
+    public ObjectNode runGetOperationOnDataResource(String uri)
+            throws RestconfException {
+
+        return null;
+    }
+
+    @Override
+    public void runPostOperationOnDataResource(String uri, ObjectNode rootNode)
+            throws RestconfException {
+    }
+
+    @Override
+    public void runPutOperationOnDataResource(String uri, ObjectNode rootNode)
+            throws RestconfException {
+    }
+
+    @Override
+    public void runDeleteOperationOnDataResource(String uri)
+            throws RestconfException {
+    }
+
+    @Override
+    public void runPatchOperationOnDataResource(String uri, ObjectNode rootNode)
+            throws RestconfException {
+    }
+
+    @Override
+    public String getRestconfRootPath() {
+        return RESTCONF_ROOT;
+    }
+
+    /**
+     * Creates a worker thread to listen to events and write to chunkedOutput.
+     * The worker thread blocks if no events arrive.
+     *
+     * @param streamId the RESTCONF stream id to which the client subscribes
+     * @param output   the string data stream
+     * @throws RestconfException if the worker thread fails to create
+     */
+    @Override
+    public void subscribeEventStream(String streamId,
+                                     ChunkedOutput<String> output)
+            throws RestconfException {
+        if (workerThreadPool instanceof ThreadPoolExecutor) {
+            if (((ThreadPoolExecutor) workerThreadPool).getActiveCount() >=
+                    maxNumOfWorkerThreads) {
+                throw new RestconfException("no more work threads left to " +
+                                                    "handle event subscription",
+                                            INTERNAL_SERVER_ERROR);
+            }
+        } else {
+            throw new RestconfException("Server ERROR: workerThreadPool NOT " +
+                                                "instanceof ThreadPoolExecutor",
+                                        INTERNAL_SERVER_ERROR);
+
+        }
+
+        BlockingQueue<ObjectNode> eventQueue = new LinkedBlockingQueue<>();
+        workerThreadPool.submit(new EventConsumer(output, eventQueue));
+    }
+
+    /**
+     * Shutdown a pool cleanly if possible.
+     *
+     * @param pool an executorService
+     */
+    private void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT, SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(THREAD_TERMINATION_TIMEOUT,
+                                           SECONDS)) {
+                    log.error("Pool did not terminate");
+                }
+            }
+        } catch (Exception ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Implementation of a worker thread which reads data from a
+     * blocking queue and writes the data to a given chunk output stream.
+     * The thread is blocked when no data arrive to the queue and is
+     * terminated when the chunk output stream is closed (i.e., the
+     * HTTP-keep-alive session is closed).
+     */
+    private class EventConsumer implements Runnable {
+
+        private String queueId;
+        private final ChunkedOutput<String> output;
+        private final BlockingQueue<ObjectNode> bqueue;
+
+        public EventConsumer(ChunkedOutput<String> output,
+                             BlockingQueue<ObjectNode> q) {
+            this.output = output;
+            this.bqueue = q;
+        }
+
+        @Override
+        public void run() {
+            try {
+                queueId = String.valueOf(Thread.currentThread().getId());
+                eventQueueList.put(queueId, bqueue);
+                log.debug("EventConsumer thread created: {}", queueId);
+
+                ObjectNode chunk;
+                while ((chunk = bqueue.take()) != null) {
+                    output.write(chunk.toString().concat(EOL));
+                }
+            } catch (IOException e) {
+                log.debug("chunkedOuput is closed: {}", this.bqueue.toString());
+                /*
+                 * Remove queue from the queue list, so that the event producer
+                 * (i.e., listener) would stop working.
+                 */
+                eventQueueList.remove(this.queueId);
+            } catch (InterruptedException e) {
+                log.error("ERROR: EventConsumer: bqueue.take() " +
+                                  "has been interrupted.");
+                log.debug("EventConsumer Exception:", e);
+            } finally {
+                try {
+                    output.close();
+                    log.debug("EventConsumer thread terminated: {}", queueId);
+                } catch (IOException e) {
+                    log.error("ERROR: EventConsumer: ", e);
+                }
+            }
+        }
+    }
+}