Fix: resolve potential NPE issues reported by sonarcube

Change-Id: I2947c9a7d929c441f767e8382561825e9f9d7dce
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 2078b2f..4d8769c 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -121,61 +121,60 @@
     @Override
     public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
 
-        boolean hasEmptyByteProducers =
-                byteProducers == null || byteProducers.isEmpty();
-        boolean hasEmptyStringProducers =
-                stringProducers == null || stringProducers.isEmpty();
-
-        if (hasEmptyByteProducers && hasEmptyStringProducers) {
-            log.debug("Kafka telemetry service has not been enabled!");
-            return null;
-        }
-
         log.debug("Send telemetry record to kafka server...");
         Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
 
-        byteProducers.forEach((k, v) -> {
-            TelemetryConfig config = telemetryConfigService.getConfig(k);
-            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+        if (byteProducers == null || byteProducers.isEmpty()) {
+            log.debug("Byte producer is empty!");
+        } else {
+            byteProducers.forEach((k, v) -> {
+                TelemetryConfig config = telemetryConfigService.getConfig(k);
+                KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
 
-            if (kafkaConfig != null &&
-                    BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
-                try {
-                    Class codecClazz = Class.forName(kafkaConfig.codec());
-                    TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+                if (kafkaConfig != null &&
+                        BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+                    try {
+                        Class codecClazz = Class.forName(kafkaConfig.codec());
+                        TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
 
-                    ByteBuffer buffer = codec.encode(flowInfos);
-                    ProducerRecord record = new ProducerRecord<>(
-                            kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
-                    futureSet.add(v.send(record));
-                } catch (ClassNotFoundException |
-                        IllegalAccessException | InstantiationException e) {
-                    log.warn("Failed to send telemetry record due to {}", e);
+                        ByteBuffer buffer = codec.encode(flowInfos);
+                        ProducerRecord record = new ProducerRecord<>(
+                                kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+                        futureSet.add(v.send(record));
+                    } catch (ClassNotFoundException |
+                            IllegalAccessException | InstantiationException e) {
+                        log.warn("Failed to send telemetry record due to {}", e);
+                    }
                 }
-            }
-        });
+            });
+        }
 
-        stringProducers.forEach((k, v) -> {
-            TelemetryConfig config = telemetryConfigService.getConfig(k);
-            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+        if (stringProducers == null || stringProducers.isEmpty()) {
+            log.debug("String producer is empty!");
+        } else {
+            stringProducers.forEach((k, v) -> {
+                TelemetryConfig config = telemetryConfigService.getConfig(k);
+                KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
 
-            if (kafkaConfig != null &&
-                    STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+                if (kafkaConfig != null &&
+                        STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
 
-                // TODO: this is a workaround to convert flowInfo to linkInfo
-                // need to find a better solution
+                    // TODO: this is a workaround to convert flowInfo to linkInfo
+                    // need to find a better solution
 
-                Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
+                    Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
 
-                if (!linkInfos.isEmpty()) {
-                    ProducerRecord record = new ProducerRecord<>(
-                            kafkaConfig.topic(), kafkaConfig.key(),
-                            encodeStrings(linkInfos, this,
-                                    kafkaConfig.codec()).toString());
-                    futureSet.add(v.send(record));
+                    if (!linkInfos.isEmpty()) {
+                        ProducerRecord record = new ProducerRecord<>(
+                                kafkaConfig.topic(), kafkaConfig.key(),
+                                encodeStrings(linkInfos, this,
+                                        kafkaConfig.codec()).toString());
+                        futureSet.add(v.send(record));
+                    }
                 }
-            }
-        });
+            });
+        }
+
         return futureSet;
     }