Refactor OpenstackTelemetry App for better readability

Change-Id: I93353de31fb9671d8670ee44fc248fe7f36ac12b
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 4629a4e..da492a9 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -21,7 +21,6 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.openstacktelemetry.api.ByteBufferCodec;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
 import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -29,12 +28,13 @@
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.RestTelemetryService;
 import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaFlowInfoByteBufferCodec;
+import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Openstack telemetry manager.
@@ -45,6 +45,9 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String KAFKA_TOPIC = "sona.flow";
+    private static final String KAFKA_KEY = "flowdata";
+
     private List<TelemetryService> telemetryServices = Lists.newArrayList();
 
     @Activate
@@ -68,45 +71,42 @@
     }
 
     @Override
-    public void publish(FlowInfo flowInfo) {
+    public void publish(Set<FlowInfo> flowInfos) {
         telemetryServices.forEach(service -> {
 
-
             if (service instanceof GrpcTelemetryManager) {
-                invokeGrpcPublisher((GrpcTelemetryService) service, flowInfo);
+                invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
             }
 
             if (service instanceof InfluxDbTelemetryManager) {
-                invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfo);
+                invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
             }
 
             if (service instanceof KafkaTelemetryManager) {
-                invokeKafkaPublisher((KafkaTelemetryService) service, flowInfo);
+                invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
             }
 
             if (service instanceof RestTelemetryManager) {
-                invokeRestPublisher((RestTelemetryService) service, flowInfo);
+                invokeRestPublisher((RestTelemetryService) service, flowInfos);
             }
-
         });
     }
 
-    private void invokeGrpcPublisher(GrpcTelemetryService service, FlowInfo flowInfo) {
+    private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
 
-    private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, FlowInfo flowInfo) {
+    private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
 
-    private void invokeKafkaPublisher(KafkaTelemetryService service, FlowInfo flowInfo) {
-        ByteBufferCodec codec = new TinaFlowInfoByteBufferCodec();
-        ByteBuffer buffer = codec.encode(flowInfo);
-        service.publish(new ProducerRecord<>("sona.flow", "flowdata", buffer.array()));
+    private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
+        TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
+        ByteBuffer buffer = codec.encode(flowInfos);
+        service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
     }
 
-    private void invokeRestPublisher(RestTelemetryService service, FlowInfo flowInfo) {
+    private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
-
 }