[ONOS-7683] Initial implementation of openstack telemetry service

Change-Id: I621bed9cff108af194998b7a1e8269a9a6ffd32c
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index e935111..8d6f62f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -18,12 +18,15 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.slf4j.Logger;
@@ -50,16 +53,25 @@
     private static final String KEY_SERIALIZER = "key.serializer";
     private static final String VALUE_SERIALIZER = "value.serializer";
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenstackTelemetryService openstackTelemetryService;
+
     private Producer<String, byte[]> producer = null;
 
     @Activate
     protected void activate() {
+
+        openstackTelemetryService.addTelemetryService(this);
+
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
         stop();
+
+        openstackTelemetryService.removeTelemetryService(this);
+
         log.info("Stopped");
     }
 
@@ -110,6 +122,17 @@
 
     @Override
     public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
+
+        if (producer == null) {
+            log.warn("Kafka telemetry service has not been enabled!");
+            return null;
+        }
+
         return producer.send(record);
     }
+
+    @Override
+    public boolean isRunning() {
+        return producer != null;
+    }
 }