[ONOS-7683] Initial implementation of openstack telemetry service
Change-Id: I621bed9cff108af194998b7a1e8269a9a6ffd32c
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index e935111..8d6f62f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -18,12 +18,15 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.slf4j.Logger;
@@ -50,16 +53,25 @@
private static final String KEY_SERIALIZER = "key.serializer";
private static final String VALUE_SERIALIZER = "value.serializer";
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenstackTelemetryService openstackTelemetryService;
+
private Producer<String, byte[]> producer = null;
@Activate
protected void activate() {
+
+ openstackTelemetryService.addTelemetryService(this);
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
stop();
+
+ openstackTelemetryService.removeTelemetryService(this);
+
log.info("Stopped");
}
@@ -110,6 +122,17 @@
@Override
public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
+
+ if (producer == null) {
+ log.warn("Kafka telemetry service has not been enabled!");
+ return null;
+ }
+
return producer.send(record);
}
+
+ @Override
+ public boolean isRunning() {
+ return producer != null;
+ }
}