Refactor: move telemetry config from componentCfg to configs.xml

1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes

Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
index 9b4aaf9..48cef34 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
@@ -15,8 +15,11 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
@@ -32,6 +35,12 @@
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Response;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.REST_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.fromTelemetryConfig;
 
 /**
  * REST telemetry manager.
@@ -48,8 +57,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
-    private WebTarget target = null;
-    private RestTelemetryConfig restConfig = null;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
+    private Map<String, WebTarget> targets = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
@@ -69,85 +80,71 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
-        if (target != null) {
-            log.info("REST producer has already been started");
-            return;
-        }
+    public void start() {
 
-        restConfig = (RestTelemetryConfig) config;
+        telemetryConfigService.getConfigsByType(REST).forEach(c -> {
+            RestTelemetryConfig restConfig = fromTelemetryConfig(c);
 
-        StringBuilder restServerBuilder = new StringBuilder();
-        restServerBuilder.append(PROTOCOL);
-        restServerBuilder.append(":");
-        restServerBuilder.append("//");
-        restServerBuilder.append(restConfig.address());
-        restServerBuilder.append(":");
-        restServerBuilder.append(restConfig.port());
-        restServerBuilder.append("/");
+            if (restConfig != null && !c.name().equals(REST_SCHEME) && c.enabled()) {
+                StringBuilder restServerBuilder = new StringBuilder();
+                restServerBuilder.append(PROTOCOL);
+                restServerBuilder.append(":");
+                restServerBuilder.append("//");
+                restServerBuilder.append(restConfig.address());
+                restServerBuilder.append(":");
+                restServerBuilder.append(restConfig.port());
+                restServerBuilder.append("/");
 
-        Client client = ClientBuilder.newBuilder().build();
+                Client client = ClientBuilder.newBuilder().build();
 
-        target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+                WebTarget target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+
+                targets.put(c.name(), target);
+            }
+        });
 
         log.info("REST producer has Started");
     }
 
     @Override
     public void stop() {
-        if (target != null) {
-            target = null;
-        }
-
+        targets.values().forEach(t -> t = null);
         log.info("REST producer has Stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
-    public Response publish(String endpoint, String method, String record) {
-        // TODO: need to find a way to invoke REST endpoint using target
-        return null;
-    }
+    public Set<Response> publish(String record) {
 
-    @Override
-    public Response publish(String method, String record) {
-        switch (method) {
-            case POST_METHOD:
-                return target.request(restConfig.requestMediaType())
-                        .post(Entity.json(record));
-            case GET_METHOD:
-                return target.request(restConfig.requestMediaType()).get();
-            default:
-                return null;
-        }
-    }
+        Set<Response> responses = Sets.newConcurrentHashSet();
 
-    @Override
-    public Response publish(String record) {
+        targets.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            RestTelemetryConfig restConfig = fromTelemetryConfig(config);
 
-        if (target == null) {
-            log.debug("REST telemetry service has not been enabled!");
-            return null;
-        }
+            switch (restConfig.method()) {
+                case POST_METHOD:
+                    responses.add(v.request(restConfig.requestMediaType())
+                            .post(Entity.json(record)));
+                    break;
+                case GET_METHOD:
+                    responses.add(v.request(restConfig.requestMediaType()).get());
+                    break;
+                default:
+                    break;
+            }
+        });
 
-        switch (restConfig.method()) {
-            case POST_METHOD:
-                return target.request(restConfig.requestMediaType())
-                        .post(Entity.json(record));
-            case GET_METHOD:
-                return target.request(restConfig.requestMediaType()).get();
-            default:
-                return null;
-        }
+        return responses;
     }
 
     @Override
     public boolean isRunning() {
-        return target != null;
+        return !targets.isEmpty();
     }
 }