Made changes as per comments.
kafkaProducer is now non-static.

TODO: KafkaPublisherManager Service and not Singleton.
Kafka event publishing.

Change-Id: I5ec20a6e4950c38e822468d343521ab77475b7d3
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
new file mode 100644
index 0000000..26b5060
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Returns the appropriate converter object based on the ONOS event type.
+ *
+ */
+public final class ConversionFactory {
+
+    // Store converters for all supported events
+    private Map<Type, EventConverter> converters =
+            new HashMap<Type, EventConverter>() {
+                {
+                    put(DEVICE, new DeviceEventConverter());
+                    put(LINK, new LinkEventConverter());
+                }
+            };
+
+    // Exists to defeat instantiation
+    private ConversionFactory() {
+    }
+
+    private static class SingletonHolder {
+        private static final ConversionFactory INSTANCE =
+                new ConversionFactory();
+    }
+
+    /**
+     * Returns a static reference to the Conversion Factory.
+     *
+     * @return singleton object
+     */
+    public static ConversionFactory getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    /**
+     * Returns an Event converter object for the specified ONOS event type.
+     *
+     * @param event ONOS event type
+     * @return Event Converter object
+     */
+    public EventConverter getConverter(Type event) {
+        return converters.get(event);
+    }
+
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
new file mode 100644
index 0000000..6b2ee24
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import com.google.protobuf.GeneratedMessage;
+import org.onosproject.event.Event;
+import org.onosproject.grpc.net.Device.DeviceCore;
+import org.onosproject.grpc.net.Device.DeviceType;
+import org.onosproject.grpc.net.DeviceEvent.DeviceEventType;
+import org.onosproject.grpc.net.DeviceEvent.DeviceNotification;
+import org.onosproject.grpc.net.Port.PortCore;
+import org.onosproject.grpc.net.Port.PortType;
+import org.onosproject.net.device.DeviceEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts ONOS Device event message to protobuf format.
+ */
+public class DeviceEventConverter implements EventConverter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
+
+        DeviceEvent deviceEvent = (DeviceEvent) event;
+
+        if (!deviceEventTypeSupported(deviceEvent)) {
+            log.error("Unsupported Onos Device Event {}. There is no matching"
+                    + "proto Device Event type", deviceEvent.type().toString());
+            return null;
+        }
+
+        return buildDeviceProtoMessage(deviceEvent);
+    }
+
+    /**
+     * Checks if the ONOS Device Event type is supported.
+     *
+     * @param event ONOS Device event
+     * @return true if there is a match and false otherwise
+     */
+    private boolean deviceEventTypeSupported(DeviceEvent event) {
+        DeviceEventType[] deviceEvents = DeviceEventType.values();
+        for (DeviceEventType deviceEventType : deviceEvents) {
+            if (deviceEventType.name().equals(event.type().name())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private DeviceNotification buildDeviceProtoMessage(DeviceEvent deviceEvent) {
+        DeviceNotification.Builder notificationBuilder =
+                DeviceNotification.newBuilder();
+
+        DeviceCore deviceCore =
+                DeviceCore.newBuilder()
+                        .setChassisId(deviceEvent.subject().chassisId().id()
+                                              .toString())
+                        .setDeviceId(deviceEvent.subject().id().toString())
+                        .setHwVersion(deviceEvent.subject().hwVersion())
+                        .setManufacturer(deviceEvent.subject().manufacturer())
+                        .setSerialNumber(deviceEvent.subject().serialNumber())
+                        .setSwVersion(deviceEvent.subject().swVersion())
+                        .setType(DeviceType
+                                         .valueOf(deviceEvent.subject().type().name()))
+                        .build();
+
+        PortCore portCore = null;
+        if (deviceEvent.port() != null) {
+            portCore =
+                    PortCore.newBuilder()
+                            .setIsEnabled(deviceEvent.port().isEnabled())
+                            .setPortNumber(deviceEvent.port().number()
+                                                   .toString())
+                            .setPortSpeed(deviceEvent.port().portSpeed())
+                            .setType(PortType
+                                             .valueOf(deviceEvent.port().type().name()))
+                            .build();
+
+            notificationBuilder.setPort(portCore);
+        }
+
+        notificationBuilder.setDeviceEventType(getProtoType(deviceEvent))
+                .setDevice(deviceCore);
+
+        return notificationBuilder.build();
+    }
+
+    /**
+     * Retrieves the protobuf generated device event type.
+     *
+     * @param event ONOS Device Event
+     * @return generated Device Event Type
+     */
+    private DeviceEventType getProtoType(DeviceEvent event) {
+        DeviceEventType protobufEventType = null;
+        DeviceEventType[] deviceEvents = DeviceEventType.values();
+        for (DeviceEventType deviceEventType : deviceEvents) {
+            if (deviceEventType.name().equals(event.type().name())) {
+                protobufEventType = deviceEventType;
+            }
+        }
+
+        return protobufEventType;
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
new file mode 100644
index 0000000..c1e7739
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.event.Event;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ *
+ * APIs for converting between ONOS event objects and protobuf data objects.
+ *
+ */
+public interface EventConverter {
+
+    /**
+     * Converts ONOS specific event data to a format that is suitable for export
+     * to Kafka.
+     *
+     * @param event ONOS Event object
+     * @return converted data in protobuf format.
+     */
+    GeneratedMessage convertToProtoMessage(Event<?, ?> event);
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
new file mode 100644
index 0000000..febe020
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.event.Event;
+import org.onosproject.grpc.net.Link.ConnectPoint;
+import org.onosproject.grpc.net.Link.LinkCore;
+import org.onosproject.grpc.net.Link.LinkState;
+import org.onosproject.grpc.net.Link.LinkType;
+import org.onosproject.grpc.net.LinkEvent.LinkEventType;
+import org.onosproject.grpc.net.LinkEvent.LinkNotification;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Converts for ONOS Link event message to protobuf format.
+ */
+public class LinkEventConverter implements EventConverter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
+
+        LinkEvent linkEvent = (LinkEvent) event;
+
+        if (!linkEventTypeSupported(linkEvent)) {
+            log.error("Unsupported Onos Event {}. There is no matching "
+                    + "proto Event type", linkEvent.type().toString());
+            return null;
+        }
+
+        return buildDeviceProtoMessage(linkEvent);
+    }
+
+    private boolean linkEventTypeSupported(LinkEvent event) {
+        LinkEventType[] kafkaLinkEvents = LinkEventType.values();
+        for (LinkEventType linkEventType : kafkaLinkEvents) {
+            if (linkEventType.name().equals(event.type().name())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private LinkNotification buildDeviceProtoMessage(LinkEvent linkEvent) {
+        LinkNotification notification = LinkNotification.newBuilder()
+                .setLinkEventType(getProtoType(linkEvent))
+                .setLink(LinkCore.newBuilder()
+                        .setState(LinkState
+                                .valueOf(linkEvent.subject().state().name()))
+                        .setType(LinkType.valueOf(linkEvent.subject().type().name()))
+                        .setDst(ConnectPoint.newBuilder()
+                                .setDeviceId(linkEvent.subject().dst()
+                                        .deviceId().toString())
+                                .setPortNumber(linkEvent.subject().dst().port()
+                                        .toString()))
+                        .setSrc(ConnectPoint.newBuilder()
+                                .setDeviceId(linkEvent.subject().src()
+                                        .deviceId().toString())
+                                .setPortNumber(linkEvent.subject().src().port()
+                                        .toString())))
+                .build();
+
+        return notification;
+    }
+
+    /**
+     * Returns the specific Kafka Device Event Type for the corresponding ONOS
+     * Device Event Type.
+     *
+     * @param event ONOS Device Event
+     * @return Kafka Device Event Type
+     */
+    private LinkEventType getProtoType(LinkEvent event) {
+        LinkEventType generatedEventType = null;
+        LinkEventType[] kafkaEvents = LinkEventType.values();
+        for (LinkEventType linkEventType : kafkaEvents) {
+            if (linkEventType.name().equals(event.type().name())) {
+                generatedEventType = linkEventType;
+            }
+        }
+
+        return generatedEventType;
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
new file mode 100644
index 0000000..d33c0d7
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Converters for converting various ONOS events to their corresponding Protocol
+ * Buffer format.
+ *
+ */
+package org.onosproject.kafkaintegration.converter;
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
new file mode 100644
index 0000000..f1d0c97
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.event.Event;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.converter.LinkEventConverter;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Implementation of Event Conversion Service.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventConversionManager implements EventConversionService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private EventConverter deviceEventConverter;
+    private EventConverter linkEventConverter;
+
+    @Activate
+    protected void activate() {
+        deviceEventConverter = new DeviceEventConverter();
+        linkEventConverter = new LinkEventConverter();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public OnosEvent convertEvent(Event<?, ?> event) {
+        if (event instanceof DeviceEvent) {
+            return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event));
+        } else if (event instanceof LinkEvent) {
+            return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event));
+        } else {
+            throw new IllegalArgumentException("Unsupported event type");
+        }
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
new file mode 100644
index 0000000..ebbac6b
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
@@ -0,0 +1,333 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
+import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
+import org.onosproject.kafkaintegration.listener.ListenerFactory;
+import org.onosproject.kafkaintegration.listener.OnosEventListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Implementation of Event Subscription Manager.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventSubscriptionManager implements EventSubscriptionService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // Stores the currently registered applications for event export service.
+    private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
+
+    private Map<Type, List<EventSubscriber>> subscriptions;
+
+    private static final String REGISTERED_APPS = "registered-applications";
+
+    private static final String SUBSCRIBED_APPS = "event-subscriptions";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ApplicationId appId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService
+                .registerApplication("org.onosproject.kafkaintegration");
+
+        registeredApps = storageService
+                .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
+                .withName(REGISTERED_APPS)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 EventSubscriberGroupId.class,
+                                                 UUID.class))
+                .build().asJavaMap();
+
+        subscriptions = storageService
+                .<Type, List<EventSubscriber>>consistentMapBuilder()
+                .withName(SUBSCRIBED_APPS)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 EventSubscriber.class,
+                                                 OnosEvent.class,
+                                                 OnosEvent.Type.class,
+                                                 DefaultEventSubscriber.class,
+                                                 EventSubscriberGroupId.class,
+                                                 UUID.class))
+                .build().asJavaMap();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public EventSubscriberGroupId registerListener(String appName) {
+
+        // TODO: Remove it once ONOS provides a mechanism for external apps
+        // to register with the core service. See Jira - 4409
+        ApplicationId externalAppId = coreService.registerApplication(appName);
+
+        return registeredApps.computeIfAbsent(externalAppId,
+                                              (key) -> new EventSubscriberGroupId(UUID
+                                                      .randomUUID()));
+    }
+
+    @Override
+    public void unregisterListener(String appName) {
+        ApplicationId externalAppId =
+                checkNotNull(coreService.getAppId(appName));
+        registeredApps.remove(externalAppId);
+    }
+
+    @Override
+    public void subscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(), subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        OnosEventListener onosListener = getListener(subscriber.eventType());
+        checkNotNull(onosListener, "No listener for the supported event type - {}", subscriber.eventType());
+
+        applyListenerAction(subscriber.eventType(), onosListener,
+                            ListenerAction.START);
+
+        // update internal state
+        List<EventSubscriber> subscriptionList =
+                subscriptions.get(subscriber.eventType());
+        if (subscriptionList == null) {
+            subscriptionList = new ArrayList<EventSubscriber>();
+        }
+        subscriptionList.add(subscriber);
+        subscriptions.put(subscriber.eventType(), subscriptionList);
+
+        log.info("Subscription for {} event by {} successful",
+                 subscriber.eventType(), subscriber.appName());
+    }
+
+    /**
+     * Checks if the application has registered.
+     *
+     * @param appName application name
+     * @return true if application has registered
+     */
+    private boolean registeredApplication(String appName) {
+
+        checkNotNull(appName);
+        ApplicationId appId = checkNotNull(coreService.getAppId(appName));
+        if (registeredApps.containsKey(appId)) {
+            return true;
+        }
+
+        log.debug("{} is not registered", appName);
+        return false;
+    }
+
+    /**
+     * Actions that can be performed on the ONOS Event Listeners.
+     *
+     */
+    private enum ListenerAction {
+        START, STOP;
+    }
+
+    /**
+     * Applies the specified action on the Listener.
+     *
+     * @param eventType the ONOS Event type registered by the application
+     * @param onosListener ONOS event listener
+     * @param action to be performed on the listener
+     */
+    private void applyListenerAction(Type eventType,
+                                     OnosEventListener onosListener,
+                                     ListenerAction action) {
+        switch (eventType) {
+        case DEVICE:
+            if (action == ListenerAction.START) {
+                onosListener.startListener(DEVICE, deviceService);
+            } else {
+                onosListener.stopListener(DEVICE, deviceService);
+            }
+            break;
+        case LINK:
+            if (action == ListenerAction.START) {
+                onosListener.startListener(LINK, linkService);
+            } else {
+                onosListener.stopListener(LINK, linkService);
+            }
+            break;
+        default:
+            log.error("Cannot {} listener. Unsupported event type {} ",
+                      action.toString(), eventType.toString());
+        }
+    }
+
+    /**
+     * Returns the ONOS event listener corresponding to the ONOS Event type.
+     *
+     * @param eventType ONOS event type
+     * @return ONOS event listener
+     */
+    private OnosEventListener getListener(Type eventType) {
+        checkNotNull(eventType);
+        ListenerFactory factory = ListenerFactory.getInstance();
+        OnosEventListener onosListener = factory.getListener(eventType);
+        return onosListener;
+    }
+
+    /**
+     * Checks if the group id is valid for this registered application.
+     *
+     * @param groupId GroupId assigned to the subscriber
+     * @param appName Registered Application name
+     * @return true if valid groupId and false otherwise
+     */
+    private boolean validGroupId(EventSubscriberGroupId groupId,
+                                 String appName) {
+
+        checkNotNull(groupId);
+
+        ApplicationId appId = coreService.getAppId(appName);
+        EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
+        if (registeredGroupId.equals(groupId)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void unsubscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(),
+                          subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        if (!eventSubscribed(subscriber)) {
+            log.error("No subscription to {} was found",
+                      subscriber.eventType());
+            return;
+        }
+
+        // If this is the only subscriber listening for this event,
+        // stop the listener.
+        List<EventSubscriber> subscribers =
+                subscriptions.get(subscriber.eventType());
+        if (subscribers.size() == 1) {
+            OnosEventListener onosListener =
+                    getListener(subscriber.eventType());
+            checkNotNull(onosListener,
+                         "No listener for the supported event type - {}",
+                         subscriber.eventType());
+            applyListenerAction(subscriber.eventType(), onosListener,
+                                ListenerAction.STOP);
+        }
+
+        // update internal state.
+        subscribers.remove(subscriber);
+        subscriptions.put(subscriber.eventType(), subscribers);
+
+        log.info("Unsubscribed {} for {} events", subscriber.appName(),
+                 subscriber.eventType());
+    }
+
+    @Override
+    public List<EventSubscriber> getEventSubscribers(Type type) {
+        return subscriptions.getOrDefault(type, ImmutableList.of());
+
+    }
+
+    /**
+     * Checks if the subscriber has already subscribed to the requested event
+     * type.
+     *
+     * @param subscriber the subscriber to a specific ONOS event
+     * @return true if subscriber has subscribed to the ONOS event
+     */
+    private boolean eventSubscribed(EventSubscriber subscriber) {
+
+        List<EventSubscriber> subscriberList =
+                subscriptions.get(subscriber.eventType());
+
+        if (subscriberList == null) {
+            return false;
+        }
+
+        return subscriberList.contains(subscriber);
+    }
+
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
new file mode 100644
index 0000000..22705d5
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.codec.CodecService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the JSON codec brokering service for Kafka app.
+ */
+@Component(immediate = true)
+public class KafkaCodecRegistrator {
+    private static Logger log = LoggerFactory.getLogger(KafkaCodecRegistrator
+                                                                .class);
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CodecService codecService;
+
+    @Activate
+    public void activate() {
+        codecService.registerCodec(EventSubscriber.class, new SubscriberCodec());
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
new file mode 100644
index 0000000..104619a
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.kafkaintegration.api.ExportableEventListener;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Dispatch ONOS Events to all interested Listeners.
+ *
+ */
+
+public final class KafkaPublisherManager
+        extends AbstractListenerManager<OnosEvent, ExportableEventListener> implements KafkaPublisherService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // Exists to defeat instantiation
+    private KafkaPublisherManager() {
+    }
+
+    //TODO: If possible, get rid of Singleton implementation.
+    private static class SingletonHolder {
+        private static final KafkaPublisherManager INSTANCE = new KafkaPublisherManager();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static KafkaPublisherManager getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    @Override
+    public void publish(Type eventType, GeneratedMessage message) {
+        log.debug("Dispatching ONOS Event {}", eventType);
+        post(new OnosEvent(eventType, message));
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
new file mode 100644
index 0000000..64af4e8
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component(immediate = true)
+public class KafkaStorageManager implements KafkaEventStorageService {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private TreeMap<Long, OnosEvent> kafkaEventStore;
+
+    private AtomicValue<Long> lastPublishedEvent;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private ScheduledExecutorService gcExService;
+
+    private InternalGarbageCollector gcTask;
+
+    // Thread scheduler parameters.
+    private final long delay = 0;
+    private final long period = 1;
+
+    @Activate
+    protected void activate() {
+        kafkaEventStore = new TreeMap<Long, OnosEvent>();
+        lastPublishedEvent = storageService.<Long>atomicValueBuilder()
+                .withName("onos-app-kafka-published-seqNumber").build()
+                .asAtomicValue();
+
+        startGC();
+
+        log.info("Started");
+    }
+
+    private void startGC() {
+        log.info("Starting Garbage Collection Service");
+        gcExService = Executors.newSingleThreadScheduledExecutor();
+        gcTask = new InternalGarbageCollector();
+        gcExService.scheduleAtFixedRate(gcTask, delay, period,
+                                        TimeUnit.SECONDS);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        stopGC();
+        log.info("Stopped");
+    }
+
+    private void stopGC() {
+        log.info("Stopping Garbage Collection Service");
+        gcExService.shutdown();
+    }
+
+    @Override
+    public boolean insertCacheEntry(OnosEvent e) {
+        // TODO: Fill in the code once the event carries timestamp info.
+        return true;
+    }
+
+    @Override
+    public void updateLastPublishedEntry(Long sequenceNumber) {
+        this.lastPublishedEvent.set(sequenceNumber);
+    }
+
+    /**
+     * Removes events from the Kafka Event Store which have been published.
+     *
+     */
+    private class InternalGarbageCollector implements Runnable {
+
+        @Override
+        public void run() {
+            kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
+        }
+    }
+
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
new file mode 100644
index 0000000..3489f6f
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Codec for encoding/decoding a Subscriber object to/from JSON.
+ *
+ */
+public final class SubscriberCodec extends JsonCodec<EventSubscriber> {
+
+    // JSON field names
+    private static final String NAME = "appName";
+    private static final String GROUP_ID = "groupId";
+    private static final String EVENT_TYPE = "eventType";
+
+    @Override
+    public ObjectNode encode(EventSubscriber data, CodecContext context) {
+        checkNotNull(data, "Subscriber cannot be null");
+        return context.mapper().createObjectNode().put(NAME, data.appName())
+                .put(GROUP_ID, data.subscriberGroupId().getId().toString())
+                .put(EVENT_TYPE, data.eventType().toString());
+    }
+
+    @Override
+    public EventSubscriber decode(ObjectNode json, CodecContext context) {
+
+        EventSubscriber.Builder resultBuilder = new DefaultEventSubscriber
+                .Builder();
+        String appName = json.get(NAME).asText();
+        resultBuilder.setAppName(appName);
+
+        String subscriberGroupId = json.get(GROUP_ID).asText();
+        resultBuilder.setSubscriberGroupId(new EventSubscriberGroupId(UUID.
+                fromString(subscriberGroupId)));
+
+        String eventType = json.get(EVENT_TYPE).asText();
+        resultBuilder.setEventType(Type.valueOf(eventType));
+        return resultBuilder.build();
+    }
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
new file mode 100644
index 0000000..a2950fa
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * API implementation classes.
+ */
+package org.onosproject.kafkaintegration.impl;
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java
new file mode 100644
index 0000000..6e7291b
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java
@@ -0,0 +1,242 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Dictionary;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Encapsulates the behavior of monitoring various ONOS events.
+ * */
+@Component(immediate = true)
+public class EventMonitor {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventSubscriptionService eventSubscriptionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventConversionService eventConversionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService componentConfigService;
+
+    private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final LinkListener linkListener = new InternalLinkListener();
+
+    protected ExecutorService eventExecutor;
+
+    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+    private static final int RETRIES = 1;
+    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
+    private static final int REQUEST_REQUIRED_ACKS = 1;
+    private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+    private static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+
+    @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
+            label = "Default host/post pair to establish initial connection to Kafka cluster.")
+    private String bootstrapServers = BOOTSTRAP_SERVERS;
+
+    @Property(name = "retries", intValue = RETRIES,
+            label = "Number of times the producer can retry to send after first failure")
+    private int retries = RETRIES;
+
+    @Property(name = "max.in.flight.requests.per.connection", intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+            label = "The maximum number of unacknowledged requests the client will send before blocking")
+    private int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+
+    @Property(name = "request.required.acks", intValue = 1,
+            label = "Producer will get an acknowledgement after the leader has replicated the data")
+    private int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+
+    @Property(name = "key.serializer", value = KEY_SERIALIZER,
+            label = "Serializer class for key that implements the Serializer interface.")
+    private String keySerializer = KEY_SERIALIZER;
+
+    @Property(name = "value.serializer", value = VALUE_SERIALIZER,
+            label = "Serializer class for value that implements the Serializer interface.")
+    private String valueSerializer = VALUE_SERIALIZER;
+
+    private Producer producer;
+
+    @Activate
+    protected void activate(ComponentContext context) {
+        componentConfigService.registerProperties(getClass());
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
+        deviceService.addListener(deviceListener);
+        linkService.addListener(linkListener);
+        producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
+                                requestRequiredAcks, keySerializer, valueSerializer);
+        producer.start();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        componentConfigService.unregisterProperties(getClass(), false);
+        deviceService.removeListener(deviceListener);
+        linkService.removeListener(linkListener);
+        producer.stop();
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
+
+        log.info("Stopped");
+    }
+
+    @Modified
+    private void modified(ComponentContext context) {
+        if (context == null) {
+            bootstrapServers = BOOTSTRAP_SERVERS;
+            retries = RETRIES;
+            maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+            requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+            keySerializer = KEY_SERIALIZER;
+            valueSerializer = VALUE_SERIALIZER;
+            return;
+        }
+        Dictionary properties = context.getProperties();
+
+        String newBootstrapServers = BOOTSTRAP_SERVERS;
+        int newRetries = RETRIES;
+        int newMaxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+        int newRequestRequiredAcks = REQUEST_REQUIRED_ACKS;
+        try {
+            String s = get(properties, "bootstrapServers");
+            newBootstrapServers = isNullOrEmpty(s)
+                    ? bootstrapServers : s.trim();
+
+            s = get(properties, "retries");
+            newRetries = isNullOrEmpty(s)
+                    ? retries : Integer.parseInt(s.trim());
+
+            s = get(properties, "maxInFlightRequestsPerConnection");
+            newMaxInFlightRequestsPerConnection = isNullOrEmpty(s)
+                    ? maxInFlightRequestsPerConnection : Integer.parseInt(s.trim());
+
+            s = get(properties, "requestRequiredAcks");
+            newRequestRequiredAcks = isNullOrEmpty(s)
+                    ? requestRequiredAcks : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            return;
+        }
+
+        boolean modified = newBootstrapServers != bootstrapServers ||
+                newRetries != retries ||
+                newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection ||
+                newRequestRequiredAcks != requestRequiredAcks;
+
+        if (modified) {
+            bootstrapServers = newBootstrapServers;
+            retries = newRetries;
+            maxInFlightRequestsPerConnection = newMaxInFlightRequestsPerConnection;
+            requestRequiredAcks = newRequestRequiredAcks;
+            if (producer != null) {
+                producer.stop();
+            }
+            producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
+                                        requestRequiredAcks, keySerializer, valueSerializer);
+            producer.start();
+            log.info("Modified");
+        } else {
+            return;
+        }
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
+                eventExecutor.execute(() -> {
+                    try {
+                        String id = UUID.randomUUID().toString();
+                        producer.send(new ProducerRecord<>(DEVICE.toString(),
+                                                                id, event.subject().toString().getBytes())).get();
+                        log.debug("Device event sent successfully.");
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    } catch (ExecutionException e) {
+                        log.error("Exception thrown {}", e);
+                    }
+                });
+            } else {
+                log.debug("No device listeners");
+            }
+        }
+    }
+
+    private class InternalLinkListener implements LinkListener {
+
+        @Override
+        public void event(LinkEvent event) {
+            if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
+                eventExecutor.execute(() -> {
+                    try {
+                        String id = UUID.randomUUID().toString();
+                        producer.send(new ProducerRecord<>(LINK.toString(),
+                                                                id, event.subject().toString().getBytes())).get();
+                        log.debug("Link event sent successfully.");
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    } catch (ExecutionException e) {
+                        log.error("Exception thrown {}", e);
+                    }
+                });
+            } else {
+                log.debug("No link listeners");
+            }
+        }
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java
new file mode 100644
index 0000000..bbdff81
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+/**
+ * Implementation of Kafka Producer.
+ */
+public class Producer {
+    private KafkaProducer<String, byte[]> kafkaProducer = null;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    Producer(String bootstrapServers, int retries, int maxInFlightRequestsPerConnection,
+             int requestRequiredAcks, String keySerializer, String valueSerializer) {
+
+        Properties prop = new Properties();
+        prop.put("bootstrap.servers", bootstrapServers);
+        prop.put("retries", retries);
+        prop.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
+        prop.put("request.required.acks", requestRequiredAcks);
+        prop.put("key.serializer", keySerializer);
+        prop.put("value.serializer", valueSerializer);
+
+        kafkaProducer = new KafkaProducer<>(prop);
+    }
+
+    public void start() {
+        log.info("Started");
+    }
+
+    public void stop() {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer = null;
+        }
+
+        log.info("Stopped");
+    }
+
+    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
+        return kafkaProducer.send(record);
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
new file mode 100644
index 0000000..e222a64
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * API implementation classes.
+ */
+package org.onosproject.kafkaintegration.kafka;
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
new file mode 100644
index 0000000..e2f23c0
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
@@ -0,0 +1,96 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import com.google.protobuf.GeneratedMessage;
+import org.onosproject.event.ListenerService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.converter.ConversionFactory;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+
+/**
+ * Listens for ONOS Device events.
+ *
+ */
+public final class DeviceEventsListener implements OnosEventListener {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private boolean listenerRunning = false;
+
+    private InnerListener listener = null;
+    // Exists to defeat instantiation
+    private DeviceEventsListener() {
+    }
+
+    private static class SingletonHolder {
+        private static final DeviceEventsListener INSTANCE =
+                new DeviceEventsListener();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static DeviceEventsListener getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    @Override
+    public void startListener(Type e, ListenerService<?, ?> service) {
+        if (!listenerRunning) {
+            listener = new InnerListener();
+            DeviceService deviceService = (DeviceService) service;
+            deviceService.addListener(listener);
+            listenerRunning = true;
+        }
+    }
+
+    private class InnerListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent arg0) {
+
+            // Convert the event to protobuf format
+            ConversionFactory conversionFactory =
+                    ConversionFactory.getInstance();
+            EventConverter converter = conversionFactory.getConverter(DEVICE);
+            GeneratedMessage message = converter.convertToProtoMessage(arg0);
+
+            // Call Dispatcher and publish event
+            KafkaPublisherManager.getInstance().publish(DEVICE, message);
+        }
+    }
+
+    @Override
+    public void stopListener(Type e, ListenerService<?, ?> service) {
+        if (listenerRunning) {
+            DeviceService deviceService = (DeviceService) service;
+            deviceService.removeListener(listener);
+            listener = null;
+            listenerRunning = false;
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
new file mode 100644
index 0000000..39ab078
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import com.google.protobuf.GeneratedMessage;
+import org.onosproject.event.ListenerService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.converter.ConversionFactory;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Listens for ONOS Link Events.
+ *
+ */
+public final class LinkEventsListener implements OnosEventListener {
+
+    private boolean listenerRunning = false;
+
+    private InnerListener listener = null;
+    // Exists to defeat instantiation
+    private LinkEventsListener() {
+    }
+
+    private static class SingletonHolder {
+        private static final LinkEventsListener INSTANCE =
+                new LinkEventsListener();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static LinkEventsListener getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    @Override
+    public void startListener(Type e, ListenerService<?, ?> service) {
+        if (!listenerRunning) {
+            listener = new InnerListener();
+            LinkService linkService = (LinkService) service;
+            linkService.addListener(listener);
+            listenerRunning = true;
+        }
+    }
+
+    private class InnerListener implements LinkListener {
+
+        @Override
+        public void event(LinkEvent arg0) {
+
+            // Convert the event to protobuf format
+            ConversionFactory conversionFactory =
+                    ConversionFactory.getInstance();
+            EventConverter converter = conversionFactory.getConverter(LINK);
+            GeneratedMessage message = converter.convertToProtoMessage(arg0);
+
+            // Call Dispatcher and publish event
+            KafkaPublisherManager.getInstance().publish(LINK, message);
+        }
+    }
+
+    @Override
+    public void stopListener(Type e, ListenerService<?, ?> service) {
+        if (listenerRunning) {
+            LinkService linkService = (LinkService) service;
+            linkService.removeListener(listener);
+            listenerRunning = false;
+        }
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
new file mode 100644
index 0000000..65b7cf1
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Returns the appropriate listener object based on the ONOS event type.
+ *
+ */
+public final class ListenerFactory {
+
+    // Store listeners for all supported events
+    private Map<Type, OnosEventListener> listeners =
+            new HashMap<Type, OnosEventListener>() {
+                {
+                    put(DEVICE, DeviceEventsListener.getInstance());
+                    put(LINK, LinkEventsListener.getInstance());
+                }
+            };
+
+    // Exists to defeat instantiation
+    private ListenerFactory() {
+    }
+
+    private static class SingletonHolder {
+        private static final ListenerFactory INSTANCE = new ListenerFactory();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static ListenerFactory getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    /**
+     * Returns the listener object for the specified ONOS event type.
+     *
+     * @param event ONOS Event type
+     * @return return listener object
+     */
+    public OnosEventListener getListener(Type event) {
+        return listeners.get(event);
+    }
+
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
new file mode 100644
index 0000000..ad14104
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import org.onosproject.event.ListenerService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+/**
+ * APIs for starting and stopping a ONOS Event listener.
+ *
+ */
+public interface OnosEventListener {
+
+    /**
+     * Start the listener for the specific ONOS event type.
+     *
+     * @param event ONOS event type
+     * @param service ONOS event listener for the specific event type
+     */
+    void startListener(Type event, ListenerService<?, ?> service);
+    /**
+     * Stop the Listener for the specific ONOS event type.
+     *
+     * @param event ONOS event type
+     * @param service ONOS event listener for the specific event type
+     */
+    void stopListener(Type event, ListenerService<?, ?> service);
+
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
new file mode 100644
index 0000000..f6f602f
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Listeners for listening to various ONOS events.
+ *
+ */
+package org.onosproject.kafkaintegration.listener;