diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
index 3d2eea1..b5bd01b 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
@@ -35,7 +35,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Override
-    public GeneratedMessageV3 convertToProtoMessage(Event<?, ?> event) {
+    public byte[] convertToProtoMessage(Event<?, ?> event) {
 
         DeviceEvent deviceEvent = (DeviceEvent) event;
 
@@ -45,7 +45,7 @@
             return null;
         }
 
-        return buildDeviceProtoMessage(deviceEvent);
+        return ((GeneratedMessageV3) buildDeviceProtoMessage(deviceEvent)).toByteArray();
     }
 
     /**
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
index 42736b4..83c62d6 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
@@ -16,7 +16,6 @@
 
 import org.onosproject.event.Event;
 
-import com.google.protobuf.GeneratedMessageV3;
 
 /**
  *
@@ -30,8 +29,7 @@
      * to Kafka.
      *
      * @param event ONOS Event object
-     * @return converted data in protobuf format.
+     * @return converted data in protobuf format as a byte array.
      */
-    // FIXME reconsider return type, something similar to "OnosEvent"?
-    GeneratedMessageV3 convertToProtoMessage(Event<?, ?> event);
+    byte[] convertToProtoMessage(Event<?, ?> event);
 }
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
index c878a26..c2cf60d 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
@@ -35,7 +35,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Override
-    public GeneratedMessageV3 convertToProtoMessage(Event<?, ?> event) {
+    public byte[] convertToProtoMessage(Event<?, ?> event) {
 
         LinkEvent linkEvent = (LinkEvent) event;
 
@@ -45,7 +45,7 @@
             return null;
         }
 
-        return buildDeviceProtoMessage(linkEvent);
+        return ((GeneratedMessageV3) buildDeviceProtoMessage(linkEvent)).toByteArray();
     }
 
     private boolean linkEventTypeSupported(LinkEvent event) {
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
index 4fee5cb..444da71 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
@@ -40,8 +40,6 @@
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
 import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.link.LinkService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
@@ -70,12 +68,6 @@
     private static final String SUBSCRIBED_APPS = "event-subscriptions";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LinkService linkService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -191,14 +183,6 @@
     }
 
     /**
-     * Actions that can be performed on the ONOS Event Listeners.
-     *
-     */
-    private enum ListenerAction {
-        START, STOP;
-    }
-
-    /**
      * Checks if the group id is valid for this registered application.
      *
      * @param groupId GroupId assigned to the subscriber
@@ -241,12 +225,10 @@
             return;
         }
 
-        // If this is the only subscriber listening for this event,
-        // stop the listener.
+        // update internal state.
         List<EventSubscriber> subscribers =
                 subscriptions.get(subscriber.eventType());
 
-        // update internal state.
         subscribers.remove(subscriber);
         subscriptions.put(subscriber.eventType(), subscribers);
 
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
index 8ea15a2..bb6fa05 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
@@ -16,99 +16,79 @@
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
-/**
- * TODO: This code is not being used at the moment.
- * This will be modified to use Distributed Work Queue.
- * Please see clustering section of
- * https://wiki.onosproject.org/display/ONOS/Kafka+Integration
- */
 @Component(immediate = false)
+@Service
 public class KafkaStorageManager implements KafkaEventStorageService {
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    private TreeMap<Long, OnosEvent> kafkaEventStore;
-
-    private AtomicValue<Long> lastPublishedEvent;
-
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private ScheduledExecutorService gcExService;
+    private static final String KAFKA_WORK_QUEUE = "Kafka-Work-Queue";
 
-    private InternalGarbageCollector gcTask;
-
-    // Thread scheduler parameters.
-    private final long delay = 0;
-    private final long period = 1;
+    private WorkQueue<OnosEvent> queue;
 
     @Activate
     protected void activate() {
-        kafkaEventStore = new TreeMap<Long, OnosEvent>();
-        lastPublishedEvent = storageService.<Long>atomicValueBuilder()
-                .withName("onos-app-kafka-published-seqNumber").build()
-                .asAtomicValue();
-
-        startGC();
+        queue = storageService.<OnosEvent>getWorkQueue(KAFKA_WORK_QUEUE,
+                                                       Serializer.using(KryoNamespaces.API,
+                                                                        OnosEvent.class,
+                                                                        OnosEvent.Type.class));
 
         log.info("Started");
     }
 
-    private void startGC() {
-        log.info("Starting Garbage Collection Service");
-        gcExService = Executors.newSingleThreadScheduledExecutor();
-        gcTask = new InternalGarbageCollector();
-        gcExService.scheduleAtFixedRate(gcTask, delay, period,
-                                        TimeUnit.SECONDS);
-    }
-
     @Deactivate
     protected void deactivate() {
-        stopGC();
+        queue = null;
         log.info("Stopped");
     }
 
-    private void stopGC() {
-        log.info("Stopping Garbage Collection Service");
-        gcExService.shutdown();
+    @Override
+    public void publishEvent(OnosEvent e) {
+        queue.addOne(e);
+        log.debug("Published {} Event to Distributed Work Queue", e.type());
     }
 
     @Override
-    public boolean insertCacheEntry(OnosEvent e) {
-        // TODO: Fill in the code once the event carries timestamp info.
-        return true;
-    }
+    public OnosEvent consumeEvent() {
+        Task<OnosEvent> task = null;
 
-    @Override
-    public void updateLastPublishedEntry(Long sequenceNumber) {
-        this.lastPublishedEvent.set(sequenceNumber);
-    }
-
-    /**
-     * Removes events from the Kafka Event Store which have been published.
-     *
-     */
-    private class InternalGarbageCollector implements Runnable {
-
-        @Override
-        public void run() {
-            kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
+        CompletableFuture<Task<OnosEvent>> future = queue.take();
+        try {
+            task = future.get();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (ExecutionException e) {
+            e.printStackTrace();
         }
+
+        if (task != null) {
+            queue.complete(task.taskId());
+            log.debug("Consumed {} Event from Distributed Work Queue with id {}",
+                     task.payload().type(), task.taskId());
+            return task.payload();
+        }
+
+        return null;
     }
 
 }
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
index 13a24c2..c63842f 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
@@ -20,12 +20,13 @@
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.kafkaintegration.api.EventConversionService;
 import org.onosproject.kafkaintegration.api.EventSubscriptionService;
-import org.onosproject.kafkaintegration.api.KafkaPublisherService;
-import org.onosproject.kafkaintegration.api.KafkaConfigService;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
@@ -36,7 +37,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutionException;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -59,22 +60,29 @@
     protected EventConversionService eventConversionService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaPublisherService kafkaProducer;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected LinkService linkService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaConfigService kafkaConfigService;
+    protected KafkaEventStorageService kafkaStoreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final LinkListener linkListener = new InternalLinkListener();
 
     protected ExecutorService eventExecutor;
 
+    private static final String PUBLISHER_TOPIC = "WORK_QUEUE_PUBLISHER";
+
+    private NodeId localNodeId;
+
     @Activate
     protected void activate() {
 
@@ -82,7 +90,9 @@
         deviceService.addListener(deviceListener);
         linkService.addListener(linkListener);
 
-        kafkaProducer.start(kafkaConfigService.getConfigParams());
+        localNodeId = clusterService.getLocalNode().id();
+
+        leadershipService.runForLeadership(PUBLISHER_TOPIC);
 
         log.info("Started");
     }
@@ -95,8 +105,6 @@
         eventExecutor.shutdownNow();
         eventExecutor = null;
 
-        kafkaProducer.stop();
-
         log.info("Stopped");
     }
 
@@ -105,25 +113,21 @@
         @Override
         public void event(DeviceEvent event) {
 
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
             if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
                 OnosEvent onosEvent = eventConversionService.convertEvent(event);
                 eventExecutor.execute(() -> {
-                    try {
-                        kafkaProducer.send(new ProducerRecord<>(DEVICE.toString(),
-                                                               onosEvent.subject().toByteArray())).get();
-
-                        log.debug("Event Type - {}, Subject {} sent successfully.",
-                                  DEVICE, onosEvent.subject());
-
-                    } catch (InterruptedException e1) {
-                        Thread.currentThread().interrupt();
-                    } catch (ExecutionException e2) {
-                        log.error("Exception thrown {}", e2);
-                    }
+                    kafkaStoreService.publishEvent(onosEvent);
                 });
-            } else {
-                log.debug("No device listeners");
+                log.debug("Pushed event {} to kafka storage", onosEvent);
             }
+
         }
     }
 
@@ -132,25 +136,21 @@
         @Override
         public void event(LinkEvent event) {
 
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
             if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
                 OnosEvent onosEvent = eventConversionService.convertEvent(event);
                 eventExecutor.execute(() -> {
-                    try {
-                        kafkaProducer.send(new ProducerRecord<>(LINK.toString(),
-                                onosEvent.subject().toByteArray())).get();
-
-                        log.debug("Event Type - {}, Subject {} sent successfully.",
-                              LINK, onosEvent.subject());
-
-                    } catch (InterruptedException e1) {
-                        Thread.currentThread().interrupt();
-                    } catch (ExecutionException e2) {
-                        log.error("Exception thrown {}", e2);
-                    }
+                    kafkaStoreService.publishEvent(onosEvent);
                 });
-            } else {
-                log.debug("No link listeners");
+                log.debug("Pushed event {} to kafka storage", onosEvent);
             }
+
         }
     }
-}
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
index c471312..fba2404 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
@@ -13,88 +13,119 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.onosproject.kafkaintegration.kafka;
 
-import java.util.Properties;
-import java.util.concurrent.Future;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.kafkaintegration.api.KafkaConfigService;
 import org.onosproject.kafkaintegration.api.KafkaPublisherService;
-import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 
-/**
- * Implementation of a Kafka Producer.
- */
-@Component
-@Service
-public class EventPublisher implements KafkaPublisherService {
-    private KafkaProducer<String, byte[]> kafkaProducer = null;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Component(immediate = true)
+public class EventPublisher {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaConfigService kafkaConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaEventStorageService kafkaStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaPublisherService kafkaPublisher;
+
+    protected ScheduledExecutorService exService;
+
+    private static final String SUBSCRIBER_TOPIC = "WORK_QUEUE_SUBSCRIBER";
+
+    private NodeId localNodeId;
+
+    // Thread Scheduler Parameters
+    private final long delay = 0;
+    private final long period = 1;
+
+    private EventCollector eventCollector;
+
     @Activate
     protected void activate() {
+
+        leadershipService.runForLeadership(SUBSCRIBER_TOPIC);
+
+        localNodeId = clusterService.getLocalNode().id();
+
+        startCollector();
+
         log.info("Started");
     }
 
+    private void startCollector() {
+        exService = Executors.newSingleThreadScheduledExecutor();
+        eventCollector = new EventCollector();
+        exService.scheduleAtFixedRate(eventCollector, delay, period, TimeUnit.SECONDS);
+    }
+
     @Deactivate
     protected void deactivate() {
+        stopCollector();
         log.info("Stopped");
     }
 
-    @Override
-    public void start(KafkaServerConfig config) {
+    private void stopCollector() {
+        exService.shutdown();
+    }
 
-        if (kafkaProducer != null) {
-            log.info("Producer has already started");
-            return;
+    private class EventCollector implements Runnable {
+
+        @Override
+        public void run() {
+
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(SUBSCRIBER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader so cannot consume event");
+                return;
+            }
+
+            try {
+                OnosEvent onosEvent = kafkaStore.consumeEvent();
+
+                if (onosEvent != null) {
+                    kafkaPublisher.send(new ProducerRecord<>(onosEvent.type().toString(),
+                                                            onosEvent.subject())).get();
+
+                    log.info("Event Type - {}, Subject {} sent successfully.",
+                             onosEvent.type(), onosEvent.subject());
+                }
+            } catch (InterruptedException e1) {
+                log.error("Thread interupted");
+                Thread.currentThread().interrupt();
+            } catch (ExecutionException e2) {
+                log.error("Cannot publish data to Kafka - {}", e2);
+            }
         }
-
-        String bootstrapServer =
-                new StringBuilder().append(config.getIpAddress()).append(":")
-                        .append(config.getPort()).toString();
-
-        // Set Server Properties
-        Properties prop = new Properties();
-        prop.put("bootstrap.servers", bootstrapServer);
-        prop.put("retries", config.getNumOfRetries());
-        prop.put("max.in.flight.requests.per.connection",
-                 config.getMaxInFlightRequestsPerConnection());
-        prop.put("request.required.acks", config.getAcksRequired());
-        prop.put("key.serializer", config.getKeySerializer());
-        prop.put("value.serializer", config.getValueSerializer());
-
-        kafkaProducer = new KafkaProducer<>(prop);
-        log.info("Kafka Producer has started.");
     }
 
-    @Override
-    public void stop() {
-        if (kafkaProducer != null) {
-            kafkaProducer.close();
-            kafkaProducer = null;
-        }
-
-        log.info("Kafka Producer has Stopped");
-    }
-
-    @Override
-    public void restart(KafkaServerConfig config) {
-        stop();
-        start(config);
-    }
-
-    @Override
-    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
-        return kafkaProducer.send(record);
-    }
 }
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
index 40d1d2d..28d3d8a 100644
--- a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
@@ -30,7 +30,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.kafkaintegration.api.KafkaConfigService;
-import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
 import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
@@ -46,7 +46,7 @@
     protected ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KafkaPublisherService producer;
+    protected KafkaPublisherAdminService kafkaPublisherAdminService;
 
     public static final String BOOTSTRAP_SERVERS = "localhost:9092";
     private String kafkaServerIp =
@@ -92,12 +92,14 @@
     @Activate
     protected void activate(ComponentContext context) {
         componentConfigService.registerProperties(getClass());
+        kafkaPublisherAdminService.start(getConfigParams());
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
+        kafkaPublisherAdminService.stop();
         log.info("Stopped");
     }
 
@@ -160,7 +162,7 @@
 
             requestRequiredAcks = newRequestRequiredAcks;
 
-            producer.restart(KafkaServerConfig.builder()
+            kafkaPublisherAdminService.restart(KafkaServerConfig.builder()
                     .ipAddress(kafkaServerIp).port(kafkaServerPortNum)
                     .numOfRetries(retries)
                     .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java
new file mode 100644
index 0000000..f9f49e6
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
+import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Kafka Producer.
+ */
+@Component
+@Service
+public class PublishManager implements KafkaPublisherService, KafkaPublisherAdminService {
+    private KafkaProducer<String, byte[]> kafkaProducer = null;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Activate
+    protected void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        stop();
+        log.info("Stopped");
+    }
+
+    @Override
+    public void start(KafkaServerConfig config) {
+
+        if (kafkaProducer != null) {
+            log.info("Producer has already started");
+            return;
+        }
+
+        String bootstrapServer =
+                new StringBuilder().append(config.getIpAddress()).append(":")
+                        .append(config.getPort()).toString();
+
+        // Set Server Properties
+        Properties prop = new Properties();
+        prop.put("bootstrap.servers", bootstrapServer);
+        prop.put("retries", config.getNumOfRetries());
+        prop.put("max.in.flight.requests.per.connection",
+                 config.getMaxInFlightRequestsPerConnection());
+        prop.put("request.required.acks", config.getAcksRequired());
+        prop.put("key.serializer", config.getKeySerializer());
+        prop.put("value.serializer", config.getValueSerializer());
+
+        kafkaProducer = new KafkaProducer<>(prop);
+        log.info("Kafka Producer has started.");
+    }
+
+    @Override
+    public void stop() {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer = null;
+        }
+
+        log.info("Kafka Producer has Stopped");
+    }
+
+    @Override
+    public void restart(KafkaServerConfig config) {
+        stop();
+        start(config);
+    }
+
+    @Override
+    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
+        return kafkaProducer.send(record);
+    }
+}
