Refactor OpenstackTelemetry App for better readability
Change-Id: I93353de31fb9671d8670ee44fc248fe7f36ac12b
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 4629a4e..da492a9 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -21,7 +21,6 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.openstacktelemetry.api.ByteBufferCodec;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -29,12 +28,13 @@
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryService;
import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaFlowInfoByteBufferCodec;
+import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Set;
/**
* Openstack telemetry manager.
@@ -45,6 +45,9 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String KAFKA_TOPIC = "sona.flow";
+ private static final String KAFKA_KEY = "flowdata";
+
private List<TelemetryService> telemetryServices = Lists.newArrayList();
@Activate
@@ -68,45 +71,42 @@
}
@Override
- public void publish(FlowInfo flowInfo) {
+ public void publish(Set<FlowInfo> flowInfos) {
telemetryServices.forEach(service -> {
-
if (service instanceof GrpcTelemetryManager) {
- invokeGrpcPublisher((GrpcTelemetryService) service, flowInfo);
+ invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
}
if (service instanceof InfluxDbTelemetryManager) {
- invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfo);
+ invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
}
if (service instanceof KafkaTelemetryManager) {
- invokeKafkaPublisher((KafkaTelemetryService) service, flowInfo);
+ invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
}
if (service instanceof RestTelemetryManager) {
- invokeRestPublisher((RestTelemetryService) service, flowInfo);
+ invokeRestPublisher((RestTelemetryService) service, flowInfos);
}
-
});
}
- private void invokeGrpcPublisher(GrpcTelemetryService service, FlowInfo flowInfo) {
+ private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, FlowInfo flowInfo) {
+ private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeKafkaPublisher(KafkaTelemetryService service, FlowInfo flowInfo) {
- ByteBufferCodec codec = new TinaFlowInfoByteBufferCodec();
- ByteBuffer buffer = codec.encode(flowInfo);
- service.publish(new ProducerRecord<>("sona.flow", "flowdata", buffer.array()));
+ private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
+ TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
+ ByteBuffer buffer = codec.encode(flowInfos);
+ service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
}
- private void invokeRestPublisher(RestTelemetryService service, FlowInfo flowInfo) {
+ private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
-
}