Add JSON string serializer for kafka telemetry exporter

Change-Id: I054b10e8356c10d9b0a37fe83fbe0a5d6907ef07
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 4ddb4c1..2078b2f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,19 +15,26 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.CodecService;
+import org.onosproject.codec.JsonCodec;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.LinkInfo;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.TelemetryCodec;
 import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.rest.AbstractWebResource;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -58,13 +65,15 @@
 import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
 import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
 import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.flowsToLinks;
 import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
 
 /**
  * Kafka telemetry manager.
  */
 @Component(immediate = true, service = KafkaTelemetryAdminService.class)
-public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
+public class KafkaTelemetryManager extends AbstractWebResource
+        implements KafkaTelemetryAdminService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -73,13 +82,24 @@
     private static final int RETRY_BACKOFF_MS_VAL = 10000;
     private static final int RECONNECT_BACKOFF_MS_VAL = 10000;
 
+    private static final String LINK_INFOS = "linkInfos";
+
+    private static final String BYTE_ARRAY_SERIALIZER =
+            "org.apache.kafka.common.serialization.ByteArraySerializer";
+    private static final String STRING_SERIALIZER =
+            "org.apache.kafka.common.serialization.StringSerializer";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CodecService codecService;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected TelemetryConfigService telemetryConfigService;
 
-    private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
+    private Map<String, Producer<String, String>> stringProducers = Maps.newConcurrentMap();
+    private Map<String, Producer<String, byte[]>> byteProducers = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
@@ -101,27 +121,59 @@
     @Override
     public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
 
-        if (producers == null || producers.isEmpty()) {
+        boolean hasEmptyByteProducers =
+                byteProducers == null || byteProducers.isEmpty();
+        boolean hasEmptyStringProducers =
+                stringProducers == null || stringProducers.isEmpty();
+
+        if (hasEmptyByteProducers && hasEmptyStringProducers) {
             log.debug("Kafka telemetry service has not been enabled!");
             return null;
         }
 
         log.debug("Send telemetry record to kafka server...");
         Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
-        producers.forEach((k, v) -> {
+
+        byteProducers.forEach((k, v) -> {
             TelemetryConfig config = telemetryConfigService.getConfig(k);
             KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
 
-            try {
-                Class codecClazz = Class.forName(kafkaConfig.codec());
-                TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+            if (kafkaConfig != null &&
+                    BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+                try {
+                    Class codecClazz = Class.forName(kafkaConfig.codec());
+                    TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
 
-                ByteBuffer buffer = codec.encode(flowInfos);
-                ProducerRecord record = new ProducerRecord<>(
-                        kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
-                futureSet.add(v.send(record));
-            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
-                log.warn("Failed to send telemetry record due to {}", e);
+                    ByteBuffer buffer = codec.encode(flowInfos);
+                    ProducerRecord record = new ProducerRecord<>(
+                            kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+                    futureSet.add(v.send(record));
+                } catch (ClassNotFoundException |
+                        IllegalAccessException | InstantiationException e) {
+                    log.warn("Failed to send telemetry record due to {}", e);
+                }
+            }
+        });
+
+        stringProducers.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+            if (kafkaConfig != null &&
+                    STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+
+                // TODO: this is a workaround to convert flowInfo to linkInfo
+                // need to find a better solution
+
+                Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
+
+                if (!linkInfos.isEmpty()) {
+                    ProducerRecord record = new ProducerRecord<>(
+                            kafkaConfig.topic(), kafkaConfig.key(),
+                            encodeStrings(linkInfos, this,
+                                    kafkaConfig.codec()).toString());
+                    futureSet.add(v.send(record));
+                }
             }
         });
         return futureSet;
@@ -129,7 +181,7 @@
 
     @Override
     public boolean isRunning() {
-        return !producers.isEmpty();
+        return !byteProducers.isEmpty();
     }
 
     @Override
@@ -161,7 +213,14 @@
             prop.put(RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS_VAL);
 
             if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
-                producers.put(name, new KafkaProducer<>(prop));
+                if (kafkaConfig.valueSerializer().equals(BYTE_ARRAY_SERIALIZER)) {
+                    byteProducers.put(name, new KafkaProducer<>(prop));
+                }
+
+                if (kafkaConfig.valueSerializer().equals(STRING_SERIALIZER)) {
+                    stringProducers.put(name, new KafkaProducer<>(prop));
+                }
+
                 success = true;
             } else {
                 log.warn("Unable to connect to {}:{}, " +
@@ -175,11 +234,17 @@
 
     @Override
     public void stop(String name) {
-        Producer<String, byte[]> producer = producers.get(name);
+        Producer<String, byte[]> byteProducer = byteProducers.get(name);
+        Producer<String, String> stringProducer = stringProducers.get(name);
 
-        if (producer != null) {
-            producer.close();
-            producers.remove(name);
+        if (byteProducer != null) {
+            byteProducer.close();
+            byteProducers.remove(name);
+        }
+
+        if (stringProducer != null) {
+            stringProducer.close();
+            stringProducers.remove(name);
         }
     }
 
@@ -197,11 +262,17 @@
 
     @Override
     public void stopAll() {
-        if (!producers.isEmpty()) {
-            producers.values().forEach(Producer::close);
+        if (!byteProducers.isEmpty()) {
+            byteProducers.values().forEach(Producer::close);
         }
 
-        producers.clear();
+        byteProducers.clear();
+
+        if (!stringProducers.isEmpty()) {
+            stringProducers.values().forEach(Producer::close);
+        }
+
+        stringProducers.clear();
 
         log.info("Kafka producer has Stopped");
     }
@@ -211,4 +282,21 @@
         stopAll();
         startAll();
     }
+
+    private ObjectNode encodeStrings(Set<LinkInfo> infos,
+                                     CodecContext context, String codecName) {
+        ObjectNode root = context.mapper().createObjectNode();
+        ArrayNode array = context.mapper().createArrayNode();
+        try {
+            Class codecClazz = Class.forName(codecName);
+            JsonCodec codec = codecService.getCodec(codecClazz);
+
+            infos.forEach(l -> array.add(codec.encode(l, context)));
+        } catch (ClassNotFoundException e) {
+            log.warn("Failed to send telemetry record due to {}", e);
+        }
+
+        root.set(LINK_INFOS, array);
+        return root;
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 30faac7..9bf7994 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -586,7 +586,7 @@
                 .withSrcMac(macAddress)
                 .withDstMac(NO_HOST_MAC)
                 .withDeviceId(deviceId)
-                .withInputInterfaceId(ARBITRARY_IN_INTF)
+                .withInputInterfaceId(getInterfaceId(ipAddress))
                 .withOutputInterfaceId(ARBITRARY_OUT_INTF)
                 .withVlanId(VlanId.vlanId());
 
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
index afb7365..78245f2 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
@@ -16,8 +16,14 @@
 package org.onosproject.openstacktelemetry.util;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
 import org.onlab.packet.IPv4;
 import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.openstacktelemetry.api.DefaultLinkInfo;
+import org.onosproject.openstacktelemetry.api.DefaultLinkStatsInfo;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.LinkInfo;
+import org.onosproject.openstacktelemetry.api.LinkStatsInfo;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -27,6 +33,9 @@
 import java.util.Optional;
 import java.util.Set;
 
+import static org.onlab.packet.IPv4.PROTOCOL_ICMP;
+import static org.onlab.packet.IPv4.PROTOCOL_TCP;
+import static org.onlab.packet.IPv4.PROTOCOL_UDP;
 import static org.onlab.util.Tools.get;
 
 /**
@@ -36,10 +45,14 @@
 
     private static final String PROTOCOL_NAME_TCP = "tcp";
     private static final String PROTOCOL_NAME_UDP = "udp";
+    private static final String PROTOCOL_NAME_ICMP = "icmp";
     private static final String PROTOCOL_NAME_ANY = "any";
     private static final int ARBITRARY_PROTOCOL = 0x0;
     private static final int TIMEOUT = 2000;
 
+    private static final String ALL_IP_ADDRESSES = "0.0.0.0";
+    private static final String LINK_ID_DELIMITER = "_";
+
     /**
      * Prevents object instantiation from external.
      */
@@ -137,4 +150,88 @@
 
         return isConnected;
     }
+
+    /**
+     * Converts flow info to link info.
+     *
+     * @param flowInfos a set of flow infos
+     * @return converted link infos
+     */
+    public static Set<LinkInfo> flowsToLinks(Set<FlowInfo> flowInfos) {
+
+        Set<LinkInfo> linkInfos = Sets.newConcurrentHashSet();
+
+        for (FlowInfo flowInfo : flowInfos) {
+            if (!ALL_IP_ADDRESSES.equals(flowInfo.srcIp().address().toString()) &&
+                 ALL_IP_ADDRESSES.equals(flowInfo.dstIp().address().toString())) {
+                FlowInfo dstFlowInfo = flowInfos.stream()
+                        .filter(f -> f.dstIp().address().toString()
+                                .equals(flowInfo.srcIp().address().toString()))
+                        .findFirst().orElse(null);
+
+                if (dstFlowInfo == null) {
+                    continue;
+                }
+
+                String linkId = flowInfo.deviceId().toString() +
+                        LINK_ID_DELIMITER + flowInfo.inputInterfaceId();
+                String srcIp = flowInfo.srcIp().address().toString();
+                String dstIp = flowInfo.dstIp().address().toString();
+                int srcPort = flowInfo.srcPort() == null ? 0 : flowInfo.srcPort().toInt();
+                int dstPort = flowInfo.dstPort() == null ? 0 : flowInfo.dstPort().toInt();
+                String protocol = protocolByteToString(flowInfo.protocol());
+
+                long txPacket = flowInfo.statsInfo().currAccPkts();
+                long txByte = flowInfo.statsInfo().currAccBytes();
+                long txDrop = flowInfo.statsInfo().dropPkts();
+
+                long rxPacket = dstFlowInfo.statsInfo().currAccPkts();
+                long rxByte = dstFlowInfo.statsInfo().currAccBytes();
+                long rxDrop = dstFlowInfo.statsInfo().dropPkts();
+
+                LinkStatsInfo statsInfo = DefaultLinkStatsInfo.builder()
+                        .withTxPacket(txPacket)
+                        .withRxPacket(rxPacket)
+                        .withTxByte(txByte)
+                        .withRxByte(rxByte)
+                        .withTxDrop(txDrop)
+                        .withRxDrop(rxDrop)
+                        .withTimestamp(System.currentTimeMillis())
+                        .build();
+
+                LinkInfo linkInfo = DefaultLinkInfo.builder()
+                        .withLinkId(linkId)
+                        .withSrcIp(srcIp)
+                        .withDstIp(dstIp)
+                        .withSrcPort(srcPort)
+                        .withDstPort(dstPort)
+                        .withProtocol(protocol)
+                        .withLinkStats(statsInfo)
+                        .build();
+
+                linkInfos.add(linkInfo);
+            }
+        }
+
+        return linkInfos;
+    }
+
+    /**
+     * Converts byte formatted protocol to string type.
+     *
+     * @param protocol byte formatted protocol
+     * @return string formatted protocol
+     */
+    public static String protocolByteToString(byte protocol) {
+        switch (protocol) {
+            case PROTOCOL_TCP:
+                return PROTOCOL_NAME_TCP;
+            case PROTOCOL_UDP:
+                return PROTOCOL_NAME_UDP;
+            case PROTOCOL_ICMP:
+                return PROTOCOL_NAME_ICMP;
+            default:
+                return "";
+        }
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
index e33b7f0..b77e1d1 100644
--- a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
+++ b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
@@ -20,6 +20,8 @@
         <property name="port">9092</property>
         <property name="retries">0</property>
     </config>
+
+    <!-- a kafka exporter used for exporting flow metrics to TINA monitoring system -->
     <config name="tina-kafka-exporter" manufacturer="SK Telecom"
             swVersion="1.0" extends="kafka" status="disabled">
         <property name="batchSize">16384</property>
@@ -38,4 +40,24 @@
             org.onosproject.openstacktelemetry.codec.bytebuffer.TinaMessageByteBufferCodec
         </property>
     </config>
+
+    <!-- a kafka exporter used for exporting link metrics to 3DV monitoring system -->
+    <config name="3dv-kafka-exporter" manufacturer="SK Telecom"
+            swVersion="1.0" extends="kafka" status="disabled">
+        <property name="batchSize">16384</property>
+        <property name="lingerMs">1</property>
+        <property name="memoryBuffer">33554432</property>
+        <property name="requiredAcks">all</property>
+        <property name="keySerializer">
+            org.apache.kafka.common.serialization.StringSerializer
+        </property>
+        <property name="valueSerializer">
+            org.apache.kafka.common.serialization.StringSerializer
+        </property>
+        <property name="topic">sona.data.link</property>
+        <property name="key">flowdata</property>
+        <property name="codec">
+            org.onosproject.openstacktelemetry.api.LinkInfo
+        </property>
+    </config>
 </configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java b/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegister.java
similarity index 95%
rename from apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
rename to apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegister.java
index 6bd8636..acc0679 100644
--- a/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
+++ b/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegister.java
@@ -29,6 +29,7 @@
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -36,9 +37,9 @@
  * Implementation of the JSON codec brokering service for OpenstackTelemetry.
  */
 @Component(immediate = true)
-public class OpenstackTelemetryCodecRegister {
+public class OpenstackRestCodecRegister {
 
-    private final org.slf4j.Logger log = getLogger(getClass());
+    private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CodecService codecService;
diff --git a/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegisterTest.java b/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegisterTest.java
similarity index 95%
rename from apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegisterTest.java
rename to apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegisterTest.java
index 6eee616..af91100 100644
--- a/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegisterTest.java
+++ b/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegisterTest.java
@@ -39,16 +39,16 @@
 /**
  * Unit test for openstack telemetry codec register.
  */
-public final class OpenstackTelemetryCodecRegisterTest {
+public final class OpenstackRestCodecRegisterTest {
 
-    private OpenstackTelemetryCodecRegister register;
+    private OpenstackRestCodecRegister register;
 
     /**
      * Tests codec register activation and deactivation.
      */
     @Test
     public void testActivateDeactivate() {
-        register = new OpenstackTelemetryCodecRegister();
+        register = new OpenstackRestCodecRegister();
         CodecService codecService = new TestCodecService();
 
         TestUtils.setField(register, "codecService", codecService);