Add JSON string serializer for kafka telemetry exporter
Change-Id: I054b10e8356c10d9b0a37fe83fbe0a5d6907ef07
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 4ddb4c1..2078b2f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,19 +15,26 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.CodecService;
+import org.onosproject.codec.JsonCodec;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.LinkInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.TelemetryCodec;
import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.rest.AbstractWebResource;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -58,13 +65,15 @@
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.flowsToLinks;
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
/**
* Kafka telemetry manager.
*/
@Component(immediate = true, service = KafkaTelemetryAdminService.class)
-public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
+public class KafkaTelemetryManager extends AbstractWebResource
+ implements KafkaTelemetryAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -73,13 +82,24 @@
private static final int RETRY_BACKOFF_MS_VAL = 10000;
private static final int RECONNECT_BACKOFF_MS_VAL = 10000;
+ private static final String LINK_INFOS = "linkInfos";
+
+ private static final String BYTE_ARRAY_SERIALIZER =
+ "org.apache.kafka.common.serialization.ByteArraySerializer";
+ private static final String STRING_SERIALIZER =
+ "org.apache.kafka.common.serialization.StringSerializer";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CodecService codecService;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected TelemetryConfigService telemetryConfigService;
- private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
+ private Map<String, Producer<String, String>> stringProducers = Maps.newConcurrentMap();
+ private Map<String, Producer<String, byte[]>> byteProducers = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -101,27 +121,59 @@
@Override
public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
- if (producers == null || producers.isEmpty()) {
+ boolean hasEmptyByteProducers =
+ byteProducers == null || byteProducers.isEmpty();
+ boolean hasEmptyStringProducers =
+ stringProducers == null || stringProducers.isEmpty();
+
+ if (hasEmptyByteProducers && hasEmptyStringProducers) {
log.debug("Kafka telemetry service has not been enabled!");
return null;
}
log.debug("Send telemetry record to kafka server...");
Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
- producers.forEach((k, v) -> {
+
+ byteProducers.forEach((k, v) -> {
TelemetryConfig config = telemetryConfigService.getConfig(k);
KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
- try {
- Class codecClazz = Class.forName(kafkaConfig.codec());
- TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+ if (kafkaConfig != null &&
+ BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+ try {
+ Class codecClazz = Class.forName(kafkaConfig.codec());
+ TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
- ByteBuffer buffer = codec.encode(flowInfos);
- ProducerRecord record = new ProducerRecord<>(
- kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
- futureSet.add(v.send(record));
- } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
- log.warn("Failed to send telemetry record due to {}", e);
+ ByteBuffer buffer = codec.encode(flowInfos);
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+ futureSet.add(v.send(record));
+ } catch (ClassNotFoundException |
+ IllegalAccessException | InstantiationException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
+ }
+ });
+
+ stringProducers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+ if (kafkaConfig != null &&
+ STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+
+ // TODO: this is a workaround to convert flowInfo to linkInfo
+ // need to find a better solution
+
+ Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
+
+ if (!linkInfos.isEmpty()) {
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(),
+ encodeStrings(linkInfos, this,
+ kafkaConfig.codec()).toString());
+ futureSet.add(v.send(record));
+ }
}
});
return futureSet;
@@ -129,7 +181,7 @@
@Override
public boolean isRunning() {
- return !producers.isEmpty();
+ return !byteProducers.isEmpty();
}
@Override
@@ -161,7 +213,14 @@
prop.put(RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS_VAL);
if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
- producers.put(name, new KafkaProducer<>(prop));
+ if (kafkaConfig.valueSerializer().equals(BYTE_ARRAY_SERIALIZER)) {
+ byteProducers.put(name, new KafkaProducer<>(prop));
+ }
+
+ if (kafkaConfig.valueSerializer().equals(STRING_SERIALIZER)) {
+ stringProducers.put(name, new KafkaProducer<>(prop));
+ }
+
success = true;
} else {
log.warn("Unable to connect to {}:{}, " +
@@ -175,11 +234,17 @@
@Override
public void stop(String name) {
- Producer<String, byte[]> producer = producers.get(name);
+ Producer<String, byte[]> byteProducer = byteProducers.get(name);
+ Producer<String, String> stringProducer = stringProducers.get(name);
- if (producer != null) {
- producer.close();
- producers.remove(name);
+ if (byteProducer != null) {
+ byteProducer.close();
+ byteProducers.remove(name);
+ }
+
+ if (stringProducer != null) {
+ stringProducer.close();
+ stringProducers.remove(name);
}
}
@@ -197,11 +262,17 @@
@Override
public void stopAll() {
- if (!producers.isEmpty()) {
- producers.values().forEach(Producer::close);
+ if (!byteProducers.isEmpty()) {
+ byteProducers.values().forEach(Producer::close);
}
- producers.clear();
+ byteProducers.clear();
+
+ if (!stringProducers.isEmpty()) {
+ stringProducers.values().forEach(Producer::close);
+ }
+
+ stringProducers.clear();
log.info("Kafka producer has Stopped");
}
@@ -211,4 +282,21 @@
stopAll();
startAll();
}
+
+ private ObjectNode encodeStrings(Set<LinkInfo> infos,
+ CodecContext context, String codecName) {
+ ObjectNode root = context.mapper().createObjectNode();
+ ArrayNode array = context.mapper().createArrayNode();
+ try {
+ Class codecClazz = Class.forName(codecName);
+ JsonCodec codec = codecService.getCodec(codecClazz);
+
+ infos.forEach(l -> array.add(codec.encode(l, context)));
+ } catch (ClassNotFoundException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
+
+ root.set(LINK_INFOS, array);
+ return root;
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 30faac7..9bf7994 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -586,7 +586,7 @@
.withSrcMac(macAddress)
.withDstMac(NO_HOST_MAC)
.withDeviceId(deviceId)
- .withInputInterfaceId(ARBITRARY_IN_INTF)
+ .withInputInterfaceId(getInterfaceId(ipAddress))
.withOutputInterfaceId(ARBITRARY_OUT_INTF)
.withVlanId(VlanId.vlanId());
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
index afb7365..78245f2 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
@@ -16,8 +16,14 @@
package org.onosproject.openstacktelemetry.util;
import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
import org.onlab.packet.IPv4;
import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.openstacktelemetry.api.DefaultLinkInfo;
+import org.onosproject.openstacktelemetry.api.DefaultLinkStatsInfo;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.LinkInfo;
+import org.onosproject.openstacktelemetry.api.LinkStatsInfo;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -27,6 +33,9 @@
import java.util.Optional;
import java.util.Set;
+import static org.onlab.packet.IPv4.PROTOCOL_ICMP;
+import static org.onlab.packet.IPv4.PROTOCOL_TCP;
+import static org.onlab.packet.IPv4.PROTOCOL_UDP;
import static org.onlab.util.Tools.get;
/**
@@ -36,10 +45,14 @@
private static final String PROTOCOL_NAME_TCP = "tcp";
private static final String PROTOCOL_NAME_UDP = "udp";
+ private static final String PROTOCOL_NAME_ICMP = "icmp";
private static final String PROTOCOL_NAME_ANY = "any";
private static final int ARBITRARY_PROTOCOL = 0x0;
private static final int TIMEOUT = 2000;
+ private static final String ALL_IP_ADDRESSES = "0.0.0.0";
+ private static final String LINK_ID_DELIMITER = "_";
+
/**
* Prevents object instantiation from external.
*/
@@ -137,4 +150,88 @@
return isConnected;
}
+
+ /**
+ * Converts flow info to link info.
+ *
+ * @param flowInfos a set of flow infos
+ * @return converted link infos
+ */
+ public static Set<LinkInfo> flowsToLinks(Set<FlowInfo> flowInfos) {
+
+ Set<LinkInfo> linkInfos = Sets.newConcurrentHashSet();
+
+ for (FlowInfo flowInfo : flowInfos) {
+ if (!ALL_IP_ADDRESSES.equals(flowInfo.srcIp().address().toString()) &&
+ ALL_IP_ADDRESSES.equals(flowInfo.dstIp().address().toString())) {
+ FlowInfo dstFlowInfo = flowInfos.stream()
+ .filter(f -> f.dstIp().address().toString()
+ .equals(flowInfo.srcIp().address().toString()))
+ .findFirst().orElse(null);
+
+ if (dstFlowInfo == null) {
+ continue;
+ }
+
+ String linkId = flowInfo.deviceId().toString() +
+ LINK_ID_DELIMITER + flowInfo.inputInterfaceId();
+ String srcIp = flowInfo.srcIp().address().toString();
+ String dstIp = flowInfo.dstIp().address().toString();
+ int srcPort = flowInfo.srcPort() == null ? 0 : flowInfo.srcPort().toInt();
+ int dstPort = flowInfo.dstPort() == null ? 0 : flowInfo.dstPort().toInt();
+ String protocol = protocolByteToString(flowInfo.protocol());
+
+ long txPacket = flowInfo.statsInfo().currAccPkts();
+ long txByte = flowInfo.statsInfo().currAccBytes();
+ long txDrop = flowInfo.statsInfo().dropPkts();
+
+ long rxPacket = dstFlowInfo.statsInfo().currAccPkts();
+ long rxByte = dstFlowInfo.statsInfo().currAccBytes();
+ long rxDrop = dstFlowInfo.statsInfo().dropPkts();
+
+ LinkStatsInfo statsInfo = DefaultLinkStatsInfo.builder()
+ .withTxPacket(txPacket)
+ .withRxPacket(rxPacket)
+ .withTxByte(txByte)
+ .withRxByte(rxByte)
+ .withTxDrop(txDrop)
+ .withRxDrop(rxDrop)
+ .withTimestamp(System.currentTimeMillis())
+ .build();
+
+ LinkInfo linkInfo = DefaultLinkInfo.builder()
+ .withLinkId(linkId)
+ .withSrcIp(srcIp)
+ .withDstIp(dstIp)
+ .withSrcPort(srcPort)
+ .withDstPort(dstPort)
+ .withProtocol(protocol)
+ .withLinkStats(statsInfo)
+ .build();
+
+ linkInfos.add(linkInfo);
+ }
+ }
+
+ return linkInfos;
+ }
+
+ /**
+ * Converts byte formatted protocol to string type.
+ *
+ * @param protocol byte formatted protocol
+ * @return string formatted protocol
+ */
+ public static String protocolByteToString(byte protocol) {
+ switch (protocol) {
+ case PROTOCOL_TCP:
+ return PROTOCOL_NAME_TCP;
+ case PROTOCOL_UDP:
+ return PROTOCOL_NAME_UDP;
+ case PROTOCOL_ICMP:
+ return PROTOCOL_NAME_ICMP;
+ default:
+ return "";
+ }
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
index e33b7f0..b77e1d1 100644
--- a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
+++ b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
@@ -20,6 +20,8 @@
<property name="port">9092</property>
<property name="retries">0</property>
</config>
+
+ <!-- a kafka exporter used for exporting flow metrics to TINA monitoring system -->
<config name="tina-kafka-exporter" manufacturer="SK Telecom"
swVersion="1.0" extends="kafka" status="disabled">
<property name="batchSize">16384</property>
@@ -38,4 +40,24 @@
org.onosproject.openstacktelemetry.codec.bytebuffer.TinaMessageByteBufferCodec
</property>
</config>
+
+ <!-- a kafka exporter used for exporting link metrics to 3DV monitoring system -->
+ <config name="3dv-kafka-exporter" manufacturer="SK Telecom"
+ swVersion="1.0" extends="kafka" status="disabled">
+ <property name="batchSize">16384</property>
+ <property name="lingerMs">1</property>
+ <property name="memoryBuffer">33554432</property>
+ <property name="requiredAcks">all</property>
+ <property name="keySerializer">
+ org.apache.kafka.common.serialization.StringSerializer
+ </property>
+ <property name="valueSerializer">
+ org.apache.kafka.common.serialization.StringSerializer
+ </property>
+ <property name="topic">sona.data.link</property>
+ <property name="key">flowdata</property>
+ <property name="codec">
+ org.onosproject.openstacktelemetry.api.LinkInfo
+ </property>
+ </config>
</configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java b/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegister.java
similarity index 95%
rename from apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
rename to apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegister.java
index 6bd8636..acc0679 100644
--- a/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
+++ b/apps/openstacktelemetry/web/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegister.java
@@ -29,6 +29,7 @@
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
@@ -36,9 +37,9 @@
* Implementation of the JSON codec brokering service for OpenstackTelemetry.
*/
@Component(immediate = true)
-public class OpenstackTelemetryCodecRegister {
+public class OpenstackRestCodecRegister {
- private final org.slf4j.Logger log = getLogger(getClass());
+ private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CodecService codecService;
diff --git a/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegisterTest.java b/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegisterTest.java
similarity index 95%
rename from apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegisterTest.java
rename to apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegisterTest.java
index 6eee616..af91100 100644
--- a/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegisterTest.java
+++ b/apps/openstacktelemetry/web/src/test/java/org/onosproject/openstacktelemetry/web/OpenstackRestCodecRegisterTest.java
@@ -39,16 +39,16 @@
/**
* Unit test for openstack telemetry codec register.
*/
-public final class OpenstackTelemetryCodecRegisterTest {
+public final class OpenstackRestCodecRegisterTest {
- private OpenstackTelemetryCodecRegister register;
+ private OpenstackRestCodecRegister register;
/**
* Tests codec register activation and deactivation.
*/
@Test
public void testActivateDeactivate() {
- register = new OpenstackTelemetryCodecRegister();
+ register = new OpenstackRestCodecRegister();
CodecService codecService = new TestCodecService();
TestUtils.setField(register, "codecService", codecService);