Fix: resolve potential NPE issues reported by sonarcube
Change-Id: I2947c9a7d929c441f767e8382561825e9f9d7dce
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 2078b2f..4d8769c 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -121,61 +121,60 @@
@Override
public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
- boolean hasEmptyByteProducers =
- byteProducers == null || byteProducers.isEmpty();
- boolean hasEmptyStringProducers =
- stringProducers == null || stringProducers.isEmpty();
-
- if (hasEmptyByteProducers && hasEmptyStringProducers) {
- log.debug("Kafka telemetry service has not been enabled!");
- return null;
- }
-
log.debug("Send telemetry record to kafka server...");
Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
- byteProducers.forEach((k, v) -> {
- TelemetryConfig config = telemetryConfigService.getConfig(k);
- KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+ if (byteProducers == null || byteProducers.isEmpty()) {
+ log.debug("Byte producer is empty!");
+ } else {
+ byteProducers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
- if (kafkaConfig != null &&
- BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
- try {
- Class codecClazz = Class.forName(kafkaConfig.codec());
- TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+ if (kafkaConfig != null &&
+ BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+ try {
+ Class codecClazz = Class.forName(kafkaConfig.codec());
+ TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
- ByteBuffer buffer = codec.encode(flowInfos);
- ProducerRecord record = new ProducerRecord<>(
- kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
- futureSet.add(v.send(record));
- } catch (ClassNotFoundException |
- IllegalAccessException | InstantiationException e) {
- log.warn("Failed to send telemetry record due to {}", e);
+ ByteBuffer buffer = codec.encode(flowInfos);
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+ futureSet.add(v.send(record));
+ } catch (ClassNotFoundException |
+ IllegalAccessException | InstantiationException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
}
- }
- });
+ });
+ }
- stringProducers.forEach((k, v) -> {
- TelemetryConfig config = telemetryConfigService.getConfig(k);
- KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+ if (stringProducers == null || stringProducers.isEmpty()) {
+ log.debug("String producer is empty!");
+ } else {
+ stringProducers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
- if (kafkaConfig != null &&
- STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+ if (kafkaConfig != null &&
+ STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
- // TODO: this is a workaround to convert flowInfo to linkInfo
- // need to find a better solution
+ // TODO: this is a workaround to convert flowInfo to linkInfo
+ // need to find a better solution
- Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
+ Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
- if (!linkInfos.isEmpty()) {
- ProducerRecord record = new ProducerRecord<>(
- kafkaConfig.topic(), kafkaConfig.key(),
- encodeStrings(linkInfos, this,
- kafkaConfig.codec()).toString());
- futureSet.add(v.send(record));
+ if (!linkInfos.isEmpty()) {
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(),
+ encodeStrings(linkInfos, this,
+ kafkaConfig.codec()).toString());
+ futureSet.add(v.send(record));
+ }
}
- }
- });
+ });
+ }
+
return futureSet;
}