Kafka Application - Updated the Application to a Distributed Application

1. Distributed Work Queue to publish events.
2. Only one instance can publish to queue and only one instance can consume from the queue.
3. KafkaPublisherAdminService which does start,stop and restart a kafka producer.
3. Other cosmectic changes (some files have been renamed)

Change-Id: I15b2015ed303eae1c9fb9f4820fb14476726161e
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaEventStorageService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaEventStorageService.java
index ae6b720..0d0cc57 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaEventStorageService.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaEventStorageService.java
@@ -23,18 +23,16 @@
 public interface KafkaEventStorageService {
 
     /**
-     * Inserts the Generated event into the local cache.
+     * Inserts the Onos Event into Distributed Work Queue.
      *
      * @param e the ONOS Event
-     * @return true if the insertion was successful
      */
-    boolean insertCacheEntry(OnosEvent e);
+    void publishEvent(OnosEvent event);
 
     /**
-     * Updates the counter with the most recently published event's sequence
-     * number.
+     * Removes the Onos Event from the Distributed Work Queue.
      *
-     * @param sequenceNumber the updated value of sequence number.
+     * @return the Onos Event
      */
-    void updateLastPublishedEntry(Long sequenceNumber);
+    OnosEvent consumeEvent();
 }
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherAdminService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherAdminService.java
new file mode 100644
index 0000000..181e903
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherAdminService.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api;
+
+import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+
+
+public interface KafkaPublisherAdminService {
+    /**
+     * Starts the Kafka Producer.
+     *
+     * @param config the Kafka Server Config
+     */
+    void start(KafkaServerConfig config);
+
+    /**
+     * Stops the Kafka Producer.
+     *
+     */
+    void stop();
+
+    /**
+     * Restarts the Kafka Producer.
+     *
+     * @param config the Kafka Server Config
+     */
+    void restart(KafkaServerConfig config);
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java
index 6156134..d1b1f2f 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java
@@ -16,41 +16,19 @@
 package org.onosproject.kafkaintegration.api;
 
 import java.util.concurrent.Future;
-
-import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+
 /**
  * APIs for controlling the Kafka Producer.
  *
  */
 public interface KafkaPublisherService {
-
-    /**
-     * Starts the Kafka Producer.
-     *
-     * @param config the Kafka Server Config
-     */
-    void start(KafkaServerConfig config);
-
-    /**
-     * Stops the Kafka Producer.
-     *
-     */
-    void stop();
-
-    /**
-     * Restarts the Kafka Producer.
-     *
-     * @param config the Kafka Server Config
-     */
-    void restart(KafkaServerConfig config);
-
     /**
      * Sends message to Kafka Server.
      *
      * @param record a message to be sent
      * @return metadata for a record that as been acknowledged
      */
-    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record);
+    Future<RecordMetadata> send(ProducerRecord<String, byte[]> record);
 }
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
index 9b28eeb..72fa028 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
@@ -16,14 +16,12 @@
 
 import org.onosproject.event.AbstractEvent;
 
-import com.google.protobuf.GeneratedMessageV3;
-
 /**
  * Represents the converted Onos Event data into protobuf format.
  *
  */
 // FIXME lack of abstraction in subject type is biting us
-public class OnosEvent extends AbstractEvent<OnosEvent.Type, GeneratedMessageV3> {
+public class OnosEvent extends AbstractEvent<OnosEvent.Type, byte[]> {
 
     /**
      * Creates a new Onos Event.
@@ -31,7 +29,7 @@
      * @param type The Type of Onos Event
      * @param subject Protobuf message corresponding to the Onos Event
      */
-    public OnosEvent(Type type, GeneratedMessageV3 subject) {
+    public OnosEvent(Type type, byte[] subject) {
         super(type, subject);
     }
 
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
index 3d2eea1..b5bd01b 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
@@ -35,7 +35,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Override
-    public GeneratedMessageV3 convertToProtoMessage(Event<?, ?> event) {
+    public byte[] convertToProtoMessage(Event<?, ?> event) {
 
         DeviceEvent deviceEvent = (DeviceEvent) event;
 
@@ -45,7 +45,7 @@
             return null;
         }
 
-        return buildDeviceProtoMessage(deviceEvent);
+        return ((GeneratedMessageV3) buildDeviceProtoMessage(deviceEvent)).toByteArray();
     }
 
     /**
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
index 42736b4..83c62d6 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
@@ -16,7 +16,6 @@
 
 import org.onosproject.event.Event;
 
-import com.google.protobuf.GeneratedMessageV3;
 
 /**
  *
@@ -30,8 +29,7 @@
      * to Kafka.
      *
      * @param event ONOS Event object
-     * @return converted data in protobuf format.
+     * @return converted data in protobuf format as a byte array.
      */
-    // FIXME reconsider return type, something similar to "OnosEvent"?
-    GeneratedMessageV3 convertToProtoMessage(Event<?, ?> event);
+    byte[] convertToProtoMessage(Event<?, ?> event);
 }
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
index c878a26..c2cf60d 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
@@ -35,7 +35,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Override
-    public GeneratedMessageV3 convertToProtoMessage(Event<?, ?> event) {
+    public byte[] convertToProtoMessage(Event<?, ?> event) {
 
         LinkEvent linkEvent = (LinkEvent) event;
 
@@ -45,7 +45,7 @@
             return null;
         }
 
-        return buildDeviceProtoMessage(linkEvent);
+        return ((GeneratedMessageV3) buildDeviceProtoMessage(linkEvent)).toByteArray();
     }
 
     private boolean linkEventTypeSupported(LinkEvent event) {
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
index 4fee5cb..444da71 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
@@ -40,8 +40,6 @@
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
 import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.link.LinkService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
@@ -70,12 +68,6 @@
     private static final String SUBSCRIBED_APPS = "event-subscriptions";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LinkService linkService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -191,14 +183,6 @@
     }
 
     /**
-     * Actions that can be performed on the ONOS Event Listeners.
-     *
-     */
-    private enum ListenerAction {
-        START, STOP;
-    }
-
-    /**
      * Checks if the group id is valid for this registered application.
      *
      * @param groupId GroupId assigned to the subscriber
@@ -241,12 +225,10 @@
             return;
         }
 
-        // If this is the only subscriber listening for this event,
-        // stop the listener.
+        // update internal state.
         List<EventSubscriber> subscribers =
                 subscriptions.get(subscriber.eventType());
 
-        // update internal state.
         subscribers.remove(subscriber);
         subscriptions.put(subscriber.eventType(), subscribers);
 
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
index 8ea15a2..bb6fa05 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
@@ -16,99 +16,79 @@
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
-/**
- * TODO: This code is not being used at the moment.
- * This will be modified to use Distributed Work Queue.
- * Please see clustering section of
- * https://wiki.onosproject.org/display/ONOS/Kafka+Integration
- */
 @Component(immediate = false)
+@Service
 public class KafkaStorageManager implements KafkaEventStorageService {
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    private TreeMap<Long, OnosEvent> kafkaEventStore;
-
-    private AtomicValue<Long> lastPublishedEvent;
-
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private ScheduledExecutorService gcExService;
+    private static final String KAFKA_WORK_QUEUE = "Kafka-Work-Queue";
 
-    private InternalGarbageCollector gcTask;
-
-    // Thread scheduler parameters.
-    private final long delay = 0;
-    private final long period = 1;
+    private WorkQueue<OnosEvent> queue;
 
     @Activate
     protected void activate() {
-        kafkaEventStore = new TreeMap<Long, OnosEvent>();
-        lastPublishedEvent = storageService.<Long>atomicValueBuilder()
-                .withName("onos-app-kafka-published-seqNumber").build()
-                .asAtomicValue();
-
-        startGC();
+        queue = storageService.<OnosEvent>getWorkQueue(KAFKA_WORK_QUEUE,
+                                                       Serializer.using(KryoNamespaces.API,
+                                                                        OnosEvent.class,
+                                                                        OnosEvent.Type.class));
 
         log.info("Started");
     }
 
-    private void startGC() {
-        log.info("Starting Garbage Collection Service");
-        gcExService = Executors.newSingleThreadScheduledExecutor();
-        gcTask = new InternalGarbageCollector();
-        gcExService.scheduleAtFixedRate(gcTask, delay, period,
-                                        TimeUnit.SECONDS);
-    }
-
     @Deactivate
     protected void deactivate() {
-        stopGC();
+        queue = null;
         log.info("Stopped");
     }
 
-    private void stopGC() {
-        log.info("Stopping Garbage Collection Service");
-        gcExService.shutdown();
+    @Override
+    public void publishEvent(OnosEvent e) {
+        queue.addOne(e);
+        log.debug("Published {} Event to Distributed Work Queue", e.type());
     }
 
     @Override
-    public boolean insertCacheEntry(OnosEvent e) {
-        // TODO: Fill in the code once the event carries timestamp info.
-        return true;
-    }
+    public OnosEvent consumeEvent() {
+        Task<OnosEvent> task = null;
 
-    @Override
-    public void updateLastPublishedEntry(Long sequenceNumber) {
-        this.lastPublishedEvent.set(sequenceNumber);
-    }
-
-    /**
-     * Removes events from the Kafka Event Store which have been published.
-     *
-     */
-    private class InternalGarbageCollector implements Runnable {
-
-        @Override
-        public void run() {
-            kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
+        CompletableFuture<Task<OnosEvent>> future = queue.take();
+        try {
+            task = future.get();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (ExecutionException e) {
+            e.printStackTrace();
         }
+
+        if (task != null) {
+            queue.complete(task.taskId());
+            log.debug("Consumed {} Event from Distributed Work Queue with id {}",
+                     task.payload().type(), task.taskId());
+            return task.payload();
+        }
+
+        return null;
     }
 
 }
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
index 13a24c2..c63842f 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
@@ -20,12 +20,13 @@
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.kafkaintegration.api.EventConversionService;
 import org.onosproject.kafkaintegration.api.EventSubscriptionService;
-import org.onosproject.kafkaintegration.api.KafkaPublisherService;
-import org.onosproject.kafkaintegration.api.KafkaConfigService;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
@@ -36,7 +37,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutionException;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -59,22 +60,29 @@
     protected EventConversionService eventConversionService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaPublisherService kafkaProducer;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected LinkService linkService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaConfigService kafkaConfigService;
+    protected KafkaEventStorageService kafkaStoreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final LinkListener linkListener = new InternalLinkListener();
 
     protected ExecutorService eventExecutor;
 
+    private static final String PUBLISHER_TOPIC = "WORK_QUEUE_PUBLISHER";
+
+    private NodeId localNodeId;
+
     @Activate
     protected void activate() {
 
@@ -82,7 +90,9 @@
         deviceService.addListener(deviceListener);
         linkService.addListener(linkListener);
 
-        kafkaProducer.start(kafkaConfigService.getConfigParams());
+        localNodeId = clusterService.getLocalNode().id();
+
+        leadershipService.runForLeadership(PUBLISHER_TOPIC);
 
         log.info("Started");
     }
@@ -95,8 +105,6 @@
         eventExecutor.shutdownNow();
         eventExecutor = null;
 
-        kafkaProducer.stop();
-
         log.info("Stopped");
     }
 
@@ -105,25 +113,21 @@
         @Override
         public void event(DeviceEvent event) {
 
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
             if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
                 OnosEvent onosEvent = eventConversionService.convertEvent(event);
                 eventExecutor.execute(() -> {
-                    try {
-                        kafkaProducer.send(new ProducerRecord<>(DEVICE.toString(),
-                                                               onosEvent.subject().toByteArray())).get();
-
-                        log.debug("Event Type - {}, Subject {} sent successfully.",
-                                  DEVICE, onosEvent.subject());
-
-                    } catch (InterruptedException e1) {
-                        Thread.currentThread().interrupt();
-                    } catch (ExecutionException e2) {
-                        log.error("Exception thrown {}", e2);
-                    }
+                    kafkaStoreService.publishEvent(onosEvent);
                 });
-            } else {
-                log.debug("No device listeners");
+                log.debug("Pushed event {} to kafka storage", onosEvent);
             }
+
         }
     }
 
@@ -132,25 +136,21 @@
         @Override
         public void event(LinkEvent event) {
 
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
             if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
                 OnosEvent onosEvent = eventConversionService.convertEvent(event);
                 eventExecutor.execute(() -> {
-                    try {
-                        kafkaProducer.send(new ProducerRecord<>(LINK.toString(),
-                                onosEvent.subject().toByteArray())).get();
-
-                        log.debug("Event Type - {}, Subject {} sent successfully.",
-                              LINK, onosEvent.subject());
-
-                    } catch (InterruptedException e1) {
-                        Thread.currentThread().interrupt();
-                    } catch (ExecutionException e2) {
-                        log.error("Exception thrown {}", e2);
-                    }
+                    kafkaStoreService.publishEvent(onosEvent);
                 });
-            } else {
-                log.debug("No link listeners");
+                log.debug("Pushed event {} to kafka storage", onosEvent);
             }
+
         }
     }
-}
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
index c471312..fba2404 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
@@ -13,88 +13,119 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.onosproject.kafkaintegration.kafka;
 
-import java.util.Properties;
-import java.util.concurrent.Future;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.kafkaintegration.api.KafkaConfigService;
 import org.onosproject.kafkaintegration.api.KafkaPublisherService;
-import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 
-/**
- * Implementation of a Kafka Producer.
- */
-@Component
-@Service
-public class EventPublisher implements KafkaPublisherService {
-    private KafkaProducer<String, byte[]> kafkaProducer = null;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Component(immediate = true)
+public class EventPublisher {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaConfigService kafkaConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaEventStorageService kafkaStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaPublisherService kafkaPublisher;
+
+    protected ScheduledExecutorService exService;
+
+    private static final String SUBSCRIBER_TOPIC = "WORK_QUEUE_SUBSCRIBER";
+
+    private NodeId localNodeId;
+
+    // Thread Scheduler Parameters
+    private final long delay = 0;
+    private final long period = 1;
+
+    private EventCollector eventCollector;
+
     @Activate
     protected void activate() {
+
+        leadershipService.runForLeadership(SUBSCRIBER_TOPIC);
+
+        localNodeId = clusterService.getLocalNode().id();
+
+        startCollector();
+
         log.info("Started");
     }
 
+    private void startCollector() {
+        exService = Executors.newSingleThreadScheduledExecutor();
+        eventCollector = new EventCollector();
+        exService.scheduleAtFixedRate(eventCollector, delay, period, TimeUnit.SECONDS);
+    }
+
     @Deactivate
     protected void deactivate() {
+        stopCollector();
         log.info("Stopped");
     }
 
-    @Override
-    public void start(KafkaServerConfig config) {
+    private void stopCollector() {
+        exService.shutdown();
+    }
 
-        if (kafkaProducer != null) {
-            log.info("Producer has already started");
-            return;
+    private class EventCollector implements Runnable {
+
+        @Override
+        public void run() {
+
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(SUBSCRIBER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader so cannot consume event");
+                return;
+            }
+
+            try {
+                OnosEvent onosEvent = kafkaStore.consumeEvent();
+
+                if (onosEvent != null) {
+                    kafkaPublisher.send(new ProducerRecord<>(onosEvent.type().toString(),
+                                                            onosEvent.subject())).get();
+
+                    log.info("Event Type - {}, Subject {} sent successfully.",
+                             onosEvent.type(), onosEvent.subject());
+                }
+            } catch (InterruptedException e1) {
+                log.error("Thread interupted");
+                Thread.currentThread().interrupt();
+            } catch (ExecutionException e2) {
+                log.error("Cannot publish data to Kafka - {}", e2);
+            }
         }
-
-        String bootstrapServer =
-                new StringBuilder().append(config.getIpAddress()).append(":")
-                        .append(config.getPort()).toString();
-
-        // Set Server Properties
-        Properties prop = new Properties();
-        prop.put("bootstrap.servers", bootstrapServer);
-        prop.put("retries", config.getNumOfRetries());
-        prop.put("max.in.flight.requests.per.connection",
-                 config.getMaxInFlightRequestsPerConnection());
-        prop.put("request.required.acks", config.getAcksRequired());
-        prop.put("key.serializer", config.getKeySerializer());
-        prop.put("value.serializer", config.getValueSerializer());
-
-        kafkaProducer = new KafkaProducer<>(prop);
-        log.info("Kafka Producer has started.");
     }
 
-    @Override
-    public void stop() {
-        if (kafkaProducer != null) {
-            kafkaProducer.close();
-            kafkaProducer = null;
-        }
-
-        log.info("Kafka Producer has Stopped");
-    }
-
-    @Override
-    public void restart(KafkaServerConfig config) {
-        stop();
-        start(config);
-    }
-
-    @Override
-    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
-        return kafkaProducer.send(record);
-    }
 }
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
index 40d1d2d..28d3d8a 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
@@ -30,7 +30,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.kafkaintegration.api.KafkaConfigService;
-import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
 import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
@@ -46,7 +46,7 @@
     protected ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaPublisherService producer;
+    protected KafkaPublisherAdminService kafkaPublisherAdminService;
 
     public static final String BOOTSTRAP_SERVERS = "localhost:9092";
     private String kafkaServerIp =
@@ -92,12 +92,14 @@
     @Activate
     protected void activate(ComponentContext context) {
         componentConfigService.registerProperties(getClass());
+        kafkaPublisherAdminService.start(getConfigParams());
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
+        kafkaPublisherAdminService.stop();
         log.info("Stopped");
     }
 
@@ -160,7 +162,7 @@
 
             requestRequiredAcks = newRequestRequiredAcks;
 
-            producer.restart(KafkaServerConfig.builder()
+            kafkaPublisherAdminService.restart(KafkaServerConfig.builder()
                     .ipAddress(kafkaServerIp).port(kafkaServerPortNum)
                     .numOfRetries(retries)
                     .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java
new file mode 100644
index 0000000..f9f49e6
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
+import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Kafka Producer.
+ */
+@Component
+@Service
+public class PublishManager implements KafkaPublisherService, KafkaPublisherAdminService {
+    private KafkaProducer<String, byte[]> kafkaProducer = null;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Activate
+    protected void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        stop();
+        log.info("Stopped");
+    }
+
+    @Override
+    public void start(KafkaServerConfig config) {
+
+        if (kafkaProducer != null) {
+            log.info("Producer has already started");
+            return;
+        }
+
+        String bootstrapServer =
+                new StringBuilder().append(config.getIpAddress()).append(":")
+                        .append(config.getPort()).toString();
+
+        // Set Server Properties
+        Properties prop = new Properties();
+        prop.put("bootstrap.servers", bootstrapServer);
+        prop.put("retries", config.getNumOfRetries());
+        prop.put("max.in.flight.requests.per.connection",
+                 config.getMaxInFlightRequestsPerConnection());
+        prop.put("request.required.acks", config.getAcksRequired());
+        prop.put("key.serializer", config.getKeySerializer());
+        prop.put("value.serializer", config.getValueSerializer());
+
+        kafkaProducer = new KafkaProducer<>(prop);
+        log.info("Kafka Producer has started.");
+    }
+
+    @Override
+    public void stop() {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer = null;
+        }
+
+        log.info("Kafka Producer has Stopped");
+    }
+
+    @Override
+    public void restart(KafkaServerConfig config) {
+        stop();
+        start(config);
+    }
+
+    @Override
+    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
+        return kafkaProducer.send(record);
+    }
+}