Kafka Application - Updated the Application to a Distributed Application

1. Distributed Work Queue to publish events.
2. Only one instance can publish to queue and only one instance can consume from the queue.
3. KafkaPublisherAdminService which does start,stop and restart a kafka producer.
3. Other cosmectic changes (some files have been renamed)

Change-Id: I15b2015ed303eae1c9fb9f4820fb14476726161e
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
index 13a24c2..c63842f 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
@@ -20,12 +20,13 @@
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.kafkaintegration.api.EventConversionService;
 import org.onosproject.kafkaintegration.api.EventSubscriptionService;
-import org.onosproject.kafkaintegration.api.KafkaPublisherService;
-import org.onosproject.kafkaintegration.api.KafkaConfigService;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
@@ -36,7 +37,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutionException;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -59,22 +60,29 @@
     protected EventConversionService eventConversionService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaPublisherService kafkaProducer;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected LinkService linkService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaConfigService kafkaConfigService;
+    protected KafkaEventStorageService kafkaStoreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final LinkListener linkListener = new InternalLinkListener();
 
     protected ExecutorService eventExecutor;
 
+    private static final String PUBLISHER_TOPIC = "WORK_QUEUE_PUBLISHER";
+
+    private NodeId localNodeId;
+
     @Activate
     protected void activate() {
 
@@ -82,7 +90,9 @@
         deviceService.addListener(deviceListener);
         linkService.addListener(linkListener);
 
-        kafkaProducer.start(kafkaConfigService.getConfigParams());
+        localNodeId = clusterService.getLocalNode().id();
+
+        leadershipService.runForLeadership(PUBLISHER_TOPIC);
 
         log.info("Started");
     }
@@ -95,8 +105,6 @@
         eventExecutor.shutdownNow();
         eventExecutor = null;
 
-        kafkaProducer.stop();
-
         log.info("Stopped");
     }
 
@@ -105,25 +113,21 @@
         @Override
         public void event(DeviceEvent event) {
 
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
             if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
                 OnosEvent onosEvent = eventConversionService.convertEvent(event);
                 eventExecutor.execute(() -> {
-                    try {
-                        kafkaProducer.send(new ProducerRecord<>(DEVICE.toString(),
-                                                               onosEvent.subject().toByteArray())).get();
-
-                        log.debug("Event Type - {}, Subject {} sent successfully.",
-                                  DEVICE, onosEvent.subject());
-
-                    } catch (InterruptedException e1) {
-                        Thread.currentThread().interrupt();
-                    } catch (ExecutionException e2) {
-                        log.error("Exception thrown {}", e2);
-                    }
+                    kafkaStoreService.publishEvent(onosEvent);
                 });
-            } else {
-                log.debug("No device listeners");
+                log.debug("Pushed event {} to kafka storage", onosEvent);
             }
+
         }
     }
 
@@ -132,25 +136,21 @@
         @Override
         public void event(LinkEvent event) {
 
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
             if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
                 OnosEvent onosEvent = eventConversionService.convertEvent(event);
                 eventExecutor.execute(() -> {
-                    try {
-                        kafkaProducer.send(new ProducerRecord<>(LINK.toString(),
-                                onosEvent.subject().toByteArray())).get();
-
-                        log.debug("Event Type - {}, Subject {} sent successfully.",
-                              LINK, onosEvent.subject());
-
-                    } catch (InterruptedException e1) {
-                        Thread.currentThread().interrupt();
-                    } catch (ExecutionException e2) {
-                        log.error("Exception thrown {}", e2);
-                    }
+                    kafkaStoreService.publishEvent(onosEvent);
                 });
-            } else {
-                log.debug("No link listeners");
+                log.debug("Pushed event {} to kafka storage", onosEvent);
             }
+
         }
     }
-}
+}
\ No newline at end of file