Starting to include kafka-integration app as part of Buck build.

- refactored to follow the api & app structure
- added buck files
- builds and loads
- cleaned-up and fixed number of pom files to function as expected
      (meaning "mvn clean install" actually works)

Change-Id: Ib896269c4986f6ee5cd6bae7cf508f71b64f59f9
diff --git a/apps/kafka-integration/app/BUCK b/apps/kafka-integration/app/BUCK
new file mode 100644
index 0000000..46cc863
--- /dev/null
+++ b/apps/kafka-integration/app/BUCK
@@ -0,0 +1,30 @@
+COMPILE_DEPS = [
+    '//lib:CORE_DEPS',
+    '//lib:JACKSON',
+    '//lib:KRYO',
+    '//lib:javax.ws.rs-api',
+    '//lib:org.apache.karaf.shell.console',
+    '//apps/kafka-integration/api:onos-apps-kafka-integration-api',
+    '//utils/rest:onlab-rest',
+    '//core/store/serializers:onos-core-serializers',
+    '//cli:onos-cli',
+    '//lib:kafka-clients',
+    '//lib:protobuf-java-3.2.0',
+    '//lib:GRPC_1.3',
+    '//incubator/protobuf/models:onos-incubator-protobuf-models',
+    '//incubator/protobuf/models:onos-incubator-protobuf-models-proto',
+]
+
+TEST_DEPS = [
+    '//lib:TEST_ADAPTERS',
+]
+
+osgi_jar_with_tests (
+    deps = COMPILE_DEPS,
+    test_deps = TEST_DEPS,
+    web_context = '/onos/kafka-integration',
+    api_title = 'Kafka Integration',
+    api_version = '1.0',
+    api_description = 'REST API for Kafka Integration',
+    api_package = 'org.onosproject.kafkaintegration.rest',
+)
diff --git a/apps/kafka-integration/app/app.xml b/apps/kafka-integration/app/app.xml
index 602bb9e..96b5458 100644
--- a/apps/kafka-integration/app/app.xml
+++ b/apps/kafka-integration/app/app.xml
@@ -19,9 +19,7 @@
      featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
      features="${project.artifactId}" apps="org.onosproject.incubator.protobuf">
     <description>${project.description}</description>
-
     <artifact>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</artifact>
-    <artifact>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</artifact>
-    <artifact>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/onos-app-kafka-app/${project.version}</artifact>
     <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</artifact>
 </app>
diff --git a/apps/kafka-integration/app/features.xml b/apps/kafka-integration/app/features.xml
index 09380ba..c615f60 100644
--- a/apps/kafka-integration/app/features.xml
+++ b/apps/kafka-integration/app/features.xml
@@ -19,8 +19,7 @@
              description="${project.description}">
         <feature>onos-api</feature>
         <bundle>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</bundle>
-        <bundle>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</bundle>
-        <bundle>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/onos-app-kafka-app/${project.version}</bundle>
         <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</bundle>
     </feature>
 </features>
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
index fb81cf9..36f200d 100644
--- a/apps/kafka-integration/app/pom.xml
+++ b/apps/kafka-integration/app/pom.xml
@@ -17,6 +17,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
     <parent>
         <groupId>org.onosproject</groupId>
         <artifactId>onos-kafka</artifactId>
@@ -24,7 +25,12 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>onos-app-kafka</artifactId>
+    <artifactId>onos-app-kafka-app</artifactId>
+    <packaging>bundle</packaging>
+
+    <url>http://onosproject.org</url>
+
+    <description>Kafka Integration Application</description>
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -32,21 +38,20 @@
         <onos.app.name>org.onosproject.kafkaintegration</onos.app.name>
         <onos.app.title>Kafka Integration Application</onos.app.title>
         <onos.app.origin>Calix, Inc.</onos.app.origin>
-        <web.context>/onos/kafka</web.context>
-        <api.version>1.0.0</api.version>
-        <api.package>org.onosproject.kafkaintegration.rest</api.package>
         <onos.app.category>Utility</onos.app.category>
         <onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
-        <onos.app.requires>org.onosproject.incubator.protobuf</onos.app.requires>
+        <onos.app.requires>org.onosproject.grpc.nb.service</onos.app.requires>
+        <web.context>/onos/kafka</web.context>
+        <api.version>1.0.0</api.version>
+        <api.title>Kafka Integration Application REST API</api.title>
+        <api.package>org.onosproject.kafkaintegration.rest</api.package>
     </properties>
 
-    <packaging>pom</packaging>
-    <description>
-        Kafka Integration Application.
-        This will export ONOS Events to Northbound Kafka Server.
-    </description>
-
     <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.onosproject</groupId>
@@ -55,23 +60,97 @@
         </dependency>
 
         <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-app-kafka-core</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-app-kafka-web</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <!--Also need to update the app.xml and the features.xml -->
-        <dependency>
             <groupId>org.apache.servicemix.bundles</groupId>
             <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
             <version>0.8.2.2_1</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-incubator-protobuf-models</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.0.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-rest</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-rest</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <_wab>src/main/webapp/</_wab>
+                        <Include-Resource>
+                            WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
+                            {maven-resources}
+                        </Include-Resource>
+                        <Bundle-SymbolicName>
+                            ${project.groupId}.${project.artifactId}
+                        </Bundle-SymbolicName>
+                        <Import-Package>
+                            *,org.glassfish.jersey.servlet
+                        </Import-Package>
+                        <Web-ContextPath>${web.context}</Web-ContextPath>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
new file mode 100644
index 0000000..e0e8d76
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Returns the appropriate converter object based on the ONOS event type.
+ *
+ */
+public final class ConversionFactory {
+
+    // Store converters for all supported events
+    private Map<Type, EventConverter> converters =
+            new HashMap<Type, EventConverter>() {
+                {
+                    put(DEVICE, new DeviceEventConverter());
+                    put(LINK, new LinkEventConverter());
+                }
+            };
+
+    // Exists to defeat instantiation
+    private ConversionFactory() {
+    }
+
+    private static class SingletonHolder {
+        private static final ConversionFactory INSTANCE =
+                new ConversionFactory();
+    }
+
+    /**
+     * Returns a static reference to the Conversion Factory.
+     *
+     * @return singleton object
+     */
+    public static ConversionFactory getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    /**
+     * Returns an Event converter object for the specified ONOS event type.
+     *
+     * @param event ONOS event type
+     * @return Event Converter object
+     */
+    public EventConverter getConverter(Type event) {
+        return converters.get(event);
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
new file mode 100644
index 0000000..a445b14
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import com.google.protobuf.GeneratedMessageV3;
+
+import org.onosproject.event.Event;
+import org.onosproject.grpc.net.device.models.DeviceEnumsProto.DeviceEventTypeProto;
+import org.onosproject.grpc.net.device.models.DeviceEnumsProto.DeviceTypeProto;
+import org.onosproject.grpc.net.device.models.DeviceEventProto.DeviceNotificationProto;
+import org.onosproject.grpc.net.device.models.PortEnumsProto;
+import org.onosproject.grpc.net.models.DeviceProtoOuterClass.DeviceProto;
+import org.onosproject.grpc.net.models.PortProtoOuterClass;
+import org.onosproject.net.device.DeviceEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts ONOS Device event message to protobuf format.
+ */
+public class DeviceEventConverter implements EventConverter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public byte[] convertToProtoMessage(Event<?, ?> event) {
+
+        DeviceEvent deviceEvent = (DeviceEvent) event;
+
+        if (!deviceEventTypeSupported(deviceEvent)) {
+            log.error("Unsupported Onos Device Event {}. There is no matching"
+                              + "proto Device Event type", deviceEvent.type().toString());
+            return null;
+        }
+
+        return ((GeneratedMessageV3) buildDeviceProtoMessage(deviceEvent)).toByteArray();
+    }
+
+    /**
+     * Checks if the ONOS Device Event type is supported.
+     *
+     * @param event ONOS Device event
+     * @return true if there is a match and false otherwise
+     */
+    private boolean deviceEventTypeSupported(DeviceEvent event) {
+        DeviceEventTypeProto[] deviceEvents = DeviceEventTypeProto.values();
+        for (DeviceEventTypeProto deviceEventType : deviceEvents) {
+            if (deviceEventType.name().equals(event.type().name())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private DeviceNotificationProto buildDeviceProtoMessage(DeviceEvent deviceEvent) {
+        DeviceNotificationProto.Builder notificationBuilder =
+                DeviceNotificationProto.newBuilder();
+
+        DeviceProto deviceCore =
+                DeviceProto.newBuilder()
+                        .setChassisId(deviceEvent.subject().chassisId().id()
+                                              .toString())
+                        .setDeviceId(deviceEvent.subject().id().toString())
+                        .setHwVersion(deviceEvent.subject().hwVersion())
+                        .setManufacturer(deviceEvent.subject().manufacturer())
+                        .setSerialNumber(deviceEvent.subject().serialNumber())
+                        .setSwVersion(deviceEvent.subject().swVersion())
+                        .setType(DeviceTypeProto
+                                         .valueOf(deviceEvent.subject().type().name()))
+                        .build();
+
+        PortProtoOuterClass.PortProto portProto = null;
+        if (deviceEvent.port() != null) {
+            portProto =
+                    PortProtoOuterClass.PortProto.newBuilder()
+                            .setIsEnabled(deviceEvent.port().isEnabled())
+                            .setPortNumber(deviceEvent.port().number()
+                                                   .toString())
+                            .setPortSpeed(deviceEvent.port().portSpeed())
+                            .setType(PortEnumsProto.PortTypeProto
+                                             .valueOf(deviceEvent.port().type().name()))
+                            .build();
+
+            notificationBuilder.setPort(portProto);
+        }
+
+        notificationBuilder.setDeviceEventType(getProtoType(deviceEvent))
+                .setDevice(deviceCore);
+
+        return notificationBuilder.build();
+    }
+
+    /**
+     * Retrieves the protobuf generated device event type.
+     *
+     * @param event ONOS Device Event
+     * @return generated Device Event Type
+     */
+    private DeviceEventTypeProto getProtoType(DeviceEvent event) {
+        DeviceEventTypeProto protobufEventType = null;
+        DeviceEventTypeProto[] deviceEvents = DeviceEventTypeProto.values();
+        for (DeviceEventTypeProto deviceEventType : deviceEvents) {
+            if (deviceEventType.name().equals(event.type().name())) {
+                protobufEventType = deviceEventType;
+            }
+        }
+
+        return protobufEventType;
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
new file mode 100644
index 0000000..884e3b1
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.event.Event;
+
+
+/**
+ *
+ * APIs for converting between ONOS event objects and protobuf data objects.
+ *
+ */
+public interface EventConverter {
+
+    /**
+     * Converts ONOS specific event data to a format that is suitable for export
+     * to Kafka.
+     *
+     * @param event ONOS Event object
+     * @return converted data in protobuf format as a byte array.
+     */
+    byte[] convertToProtoMessage(Event<?, ?> event);
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
new file mode 100644
index 0000000..5abab45
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import com.google.protobuf.GeneratedMessageV3;
+import org.onosproject.event.Event;
+import org.onosproject.grpc.net.link.models.LinkEnumsProto.LinkEventTypeProto;
+import org.onosproject.grpc.net.link.models.LinkEnumsProto.LinkStateProto;
+import org.onosproject.grpc.net.link.models.LinkEnumsProto.LinkTypeProto;
+import org.onosproject.grpc.net.link.models.LinkEventProto.LinkNotificationProto;
+import org.onosproject.grpc.net.models.ConnectPointProtoOuterClass.ConnectPointProto;
+import org.onosproject.grpc.net.models.LinkProtoOuterClass.LinkProto;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts for ONOS Link event message to protobuf format.
+ */
+public class LinkEventConverter implements EventConverter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public byte[] convertToProtoMessage(Event<?, ?> event) {
+
+        LinkEvent linkEvent = (LinkEvent) event;
+
+        if (!linkEventTypeSupported(linkEvent)) {
+            log.error("Unsupported Onos Event {}. There is no matching "
+                              + "proto Event type", linkEvent.type().toString());
+            return null;
+        }
+
+        return ((GeneratedMessageV3) buildDeviceProtoMessage(linkEvent)).toByteArray();
+    }
+
+    private boolean linkEventTypeSupported(LinkEvent event) {
+        LinkEventTypeProto[] kafkaLinkEvents = LinkEventTypeProto.values();
+        for (LinkEventTypeProto linkEventType : kafkaLinkEvents) {
+            if (linkEventType.name().equals(event.type().name())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private LinkNotificationProto buildDeviceProtoMessage(LinkEvent linkEvent) {
+        LinkNotificationProto notification = LinkNotificationProto.newBuilder()
+                .setLinkEventType(getProtoType(linkEvent))
+                .setLink(LinkProto.newBuilder()
+                                 .setState(LinkStateProto.ACTIVE
+                                                   .valueOf(linkEvent.subject().state().name()))
+                                 .setType(LinkTypeProto.valueOf(linkEvent.subject().type().name()))
+                                 .setDst(ConnectPointProto.newBuilder()
+                                                 .setDeviceId(linkEvent.subject().dst()
+                                                                      .deviceId().toString())
+                                                 .setPortNumber(linkEvent.subject().dst().port()
+                                                                        .toString()))
+                                 .setSrc(ConnectPointProto.newBuilder()
+                                                 .setDeviceId(linkEvent.subject().src()
+                                                                      .deviceId().toString())
+                                                 .setPortNumber(linkEvent.subject().src().port()
+                                                                        .toString())))
+                .build();
+
+        return notification;
+    }
+
+    /**
+     * Returns the specific Kafka Device Event Type for the corresponding ONOS
+     * Device Event Type.
+     *
+     * @param event ONOS Device Event
+     * @return Kafka Device Event Type
+     */
+    private LinkEventTypeProto getProtoType(LinkEvent event) {
+        LinkEventTypeProto generatedEventType = null;
+        LinkEventTypeProto[] kafkaEvents = LinkEventTypeProto.values();
+        for (LinkEventTypeProto linkEventType : kafkaEvents) {
+            if (linkEventType.name().equals(event.type().name())) {
+                generatedEventType = linkEventType;
+            }
+        }
+
+        return generatedEventType;
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
new file mode 100644
index 0000000..bb61c47
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Converters for converting various ONOS events to their corresponding Protocol
+ * Buffer format.
+ *
+ */
+package org.onosproject.kafkaintegration.converter;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
new file mode 100644
index 0000000..c96b81b
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.event.Event;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.converter.LinkEventConverter;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Implementation of Event Conversion Service.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventConversionManager implements EventConversionService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private EventConverter deviceEventConverter;
+    private EventConverter linkEventConverter;
+
+    @Activate
+    protected void activate() {
+        deviceEventConverter = new DeviceEventConverter();
+        linkEventConverter = new LinkEventConverter();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public OnosEvent convertEvent(Event<?, ?> event) {
+        if (event instanceof DeviceEvent) {
+            return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event));
+        } else if (event instanceof LinkEvent) {
+            return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event));
+        } else {
+            throw new IllegalArgumentException("Unsupported event type");
+        }
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
new file mode 100644
index 0000000..fdb7ed5
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
@@ -0,0 +1,263 @@
+/**
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.KafkaConfigService;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
+import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Implementation of Event Subscription Manager.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventSubscriptionManager implements EventSubscriptionService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // Stores the currently registered applications for event export service.
+    private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
+
+    private Map<Type, List<EventSubscriber>> subscriptions;
+
+    private static final String REGISTERED_APPS = "registered-applications";
+
+    private static final String SUBSCRIBED_APPS = "event-subscriptions";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaConfigService kafkaConfigService;
+
+    private ApplicationId appId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService
+                .registerApplication("org.onosproject.kafkaintegration");
+
+        registeredApps = storageService
+                .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
+                .withName(REGISTERED_APPS)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 EventSubscriberGroupId.class,
+                                                 UUID.class))
+                .build().asJavaMap();
+
+        subscriptions = storageService
+                .<Type, List<EventSubscriber>>consistentMapBuilder()
+                .withName(SUBSCRIBED_APPS)
+                .withSerializer(Serializer
+                        .using(KryoNamespaces.API, EventSubscriber.class,
+                               OnosEvent.class, OnosEvent.Type.class,
+                               DefaultEventSubscriber.class,
+                               EventSubscriberGroupId.class, UUID.class))
+                .build().asJavaMap();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public RegistrationResponse registerListener(String appName) {
+
+        // TODO: Remove it once ONOS provides a mechanism for external apps
+        // to register with the core service. See Jira - 4409
+        ApplicationId externalAppId = coreService.registerApplication(appName);
+
+        EventSubscriberGroupId id =
+                registeredApps.computeIfAbsent(externalAppId,
+                                               (key) -> new EventSubscriberGroupId(UUID
+                                                       .randomUUID()));
+
+        RegistrationResponse response = new RegistrationResponse(id,
+                            kafkaConfigService.getConfigParams().getIpAddress(),
+                            kafkaConfigService.getConfigParams().getPort());
+
+        return response;
+    }
+
+    @Override
+    public void unregisterListener(String appName) {
+        ApplicationId externalAppId =
+                checkNotNull(coreService.getAppId(appName));
+        registeredApps.remove(externalAppId);
+    }
+
+    @Override
+    public void subscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(),
+                          subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        // update internal state
+        List<EventSubscriber> subscriptionList =
+                subscriptions.get(subscriber.eventType());
+        if (subscriptionList == null) {
+            subscriptionList = new ArrayList<EventSubscriber>();
+        }
+        subscriptionList.add(subscriber);
+        subscriptions.put(subscriber.eventType(), subscriptionList);
+
+        log.info("Subscription for {} event by {} successful",
+                 subscriber.eventType(), subscriber.appName());
+    }
+
+    /**
+     * Checks if the application has registered.
+     *
+     * @param appName application name
+     * @return true if application has registered
+     */
+    private boolean registeredApplication(String appName) {
+
+        checkNotNull(appName);
+        ApplicationId appId = checkNotNull(coreService.getAppId(appName));
+        if (registeredApps.containsKey(appId)) {
+            return true;
+        }
+
+        log.debug("{} is not registered", appName);
+        return false;
+    }
+
+    /**
+     * Checks if the group id is valid for this registered application.
+     *
+     * @param groupId GroupId assigned to the subscriber
+     * @param appName Registered Application name
+     * @return true if valid groupId and false otherwise
+     */
+    private boolean validGroupId(EventSubscriberGroupId groupId,
+                                 String appName) {
+
+        checkNotNull(groupId);
+
+        ApplicationId appId = coreService.getAppId(appName);
+        EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
+        if (registeredGroupId.equals(groupId)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void unsubscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(),
+                          subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        if (!eventSubscribed(subscriber)) {
+            log.error("No subscription to {} was found",
+                      subscriber.eventType());
+            return;
+        }
+
+        // update internal state.
+        List<EventSubscriber> subscribers =
+                subscriptions.get(subscriber.eventType());
+
+        subscribers.remove(subscriber);
+        subscriptions.put(subscriber.eventType(), subscribers);
+
+        log.info("Unsubscribed {} for {} events", subscriber.appName(),
+                 subscriber.eventType());
+    }
+
+    @Override
+    public List<EventSubscriber> getEventSubscribers(Type type) {
+        return subscriptions.getOrDefault(type, ImmutableList.of());
+    }
+
+    /**
+     * Checks if the subscriber has already subscribed to the requested event
+     * type.
+     *
+     * @param subscriber the subscriber to a specific ONOS event
+     * @return true if subscriber has subscribed to the ONOS event
+     */
+    private boolean eventSubscribed(EventSubscriber subscriber) {
+
+        List<EventSubscriber> subscriberList =
+                subscriptions.get(subscriber.eventType());
+
+        if (subscriberList == null) {
+            return false;
+        }
+
+        return subscriberList.contains(subscriber);
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
new file mode 100644
index 0000000..4af3f50
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.codec.CodecService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the JSON codec brokering service for Kafka app.
+ */
+@Component(immediate = true)
+public class KafkaCodecRegistrator {
+    private static Logger log = LoggerFactory.getLogger(KafkaCodecRegistrator
+                                                                .class);
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CodecService codecService;
+
+    @Activate
+    public void activate() {
+        codecService.registerCodec(EventSubscriber.class, new SubscriberCodec());
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
new file mode 100644
index 0000000..66bfc18
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+@Component(immediate = false)
+@Service
+public class KafkaStorageManager implements KafkaEventStorageService {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final String KAFKA_WORK_QUEUE = "Kafka-Work-Queue";
+
+    private WorkQueue<OnosEvent> queue;
+
+    @Activate
+    protected void activate() {
+        queue = storageService.<OnosEvent>getWorkQueue(KAFKA_WORK_QUEUE,
+                                                       Serializer.using(KryoNamespaces.API,
+                                                                        OnosEvent.class,
+                                                                        OnosEvent.Type.class));
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        queue = null;
+        log.info("Stopped");
+    }
+
+    @Override
+    public void publishEvent(OnosEvent e) {
+        queue.addOne(e);
+        log.debug("Published {} Event to Distributed Work Queue", e.type());
+    }
+
+    @Override
+    public OnosEvent consumeEvent() {
+        Task<OnosEvent> task = null;
+
+        CompletableFuture<Task<OnosEvent>> future = queue.take();
+        try {
+            task = future.get();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (ExecutionException e) {
+            e.printStackTrace();
+        }
+
+        if (task != null) {
+            queue.complete(task.taskId());
+            log.debug("Consumed {} Event from Distributed Work Queue with id {}",
+                     task.payload().type(), task.taskId());
+            return task.payload();
+        }
+
+        return null;
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
new file mode 100644
index 0000000..0c4c3b4
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Codec for encoding/decoding a Subscriber object to/from JSON.
+ *
+ */
+public final class SubscriberCodec extends JsonCodec<EventSubscriber> {
+
+    // JSON field names
+    private static final String NAME = "appName";
+    private static final String GROUP_ID = "groupId";
+    private static final String EVENT_TYPE = "eventType";
+
+    @Override
+    public ObjectNode encode(EventSubscriber data, CodecContext context) {
+        checkNotNull(data, "Subscriber cannot be null");
+        return context.mapper().createObjectNode().put(NAME, data.appName())
+                .put(GROUP_ID, data.subscriberGroupId().getId().toString())
+                .put(EVENT_TYPE, data.eventType().toString());
+    }
+
+    @Override
+    public EventSubscriber decode(ObjectNode json, CodecContext context) {
+
+        EventSubscriber.Builder resultBuilder = new DefaultEventSubscriber
+                .Builder();
+        String appName = json.get(NAME).asText();
+        resultBuilder.setAppName(appName);
+
+        String subscriberGroupId = json.get(GROUP_ID).asText();
+        resultBuilder.setSubscriberGroupId(new EventSubscriberGroupId(UUID.
+                fromString(subscriberGroupId)));
+
+        String eventType = json.get(EVENT_TYPE).asText();
+        resultBuilder.setEventType(Type.valueOf(eventType));
+        return resultBuilder.build();
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
new file mode 100644
index 0000000..a422ecc
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * API implementation classes.
+ */
+package org.onosproject.kafkaintegration.impl;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
new file mode 100644
index 0000000..1b53c2a
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventListener.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+
+/**
+ * Encapsulates the behavior of monitoring various ONOS events.
+ * */
+@Component(immediate = true)
+public class EventListener {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventSubscriptionService eventSubscriptionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventConversionService eventConversionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaEventStorageService kafkaStoreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final LinkListener linkListener = new InternalLinkListener();
+
+    protected ExecutorService eventExecutor;
+
+    private static final String PUBLISHER_TOPIC = "WORK_QUEUE_PUBLISHER";
+
+    private NodeId localNodeId;
+
+    @Activate
+    protected void activate() {
+
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
+        deviceService.addListener(deviceListener);
+        linkService.addListener(linkListener);
+
+        localNodeId = clusterService.getLocalNode().id();
+
+        leadershipService.runForLeadership(PUBLISHER_TOPIC);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        deviceService.removeListener(deviceListener);
+        linkService.removeListener(linkListener);
+
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
+
+        log.info("Stopped");
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
+            if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
+                OnosEvent onosEvent = eventConversionService.convertEvent(event);
+                eventExecutor.execute(() -> {
+                    kafkaStoreService.publishEvent(onosEvent);
+                });
+                log.debug("Pushed event {} to kafka storage", onosEvent);
+            }
+
+        }
+    }
+
+    private class InternalLinkListener implements LinkListener {
+
+        @Override
+        public void event(LinkEvent event) {
+
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader, cannot publish!");
+                return;
+            }
+
+            if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
+                OnosEvent onosEvent = eventConversionService.convertEvent(event);
+                eventExecutor.execute(() -> {
+                    kafkaStoreService.publishEvent(onosEvent);
+                });
+                log.debug("Pushed event {} to kafka storage", onosEvent);
+            }
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
new file mode 100644
index 0000000..a42e56a
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/EventPublisher.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.kafkaintegration.api.KafkaConfigService;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Component(immediate = true)
+public class EventPublisher {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaConfigService kafkaConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaEventStorageService kafkaStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaPublisherService kafkaPublisher;
+
+    protected ScheduledExecutorService exService;
+
+    private static final String SUBSCRIBER_TOPIC = "WORK_QUEUE_SUBSCRIBER";
+
+    private NodeId localNodeId;
+
+    // Thread Scheduler Parameters
+    private final long delay = 0;
+    private final long period = 1;
+
+    private EventCollector eventCollector;
+
+    @Activate
+    protected void activate() {
+
+        leadershipService.runForLeadership(SUBSCRIBER_TOPIC);
+
+        localNodeId = clusterService.getLocalNode().id();
+
+        startCollector();
+
+        log.info("Started");
+    }
+
+    private void startCollector() {
+        exService = Executors.newSingleThreadScheduledExecutor();
+        eventCollector = new EventCollector();
+        exService.scheduleAtFixedRate(eventCollector, delay, period, TimeUnit.SECONDS);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        stopCollector();
+        log.info("Stopped");
+    }
+
+    private void stopCollector() {
+        exService.shutdown();
+    }
+
+    private class EventCollector implements Runnable {
+
+        @Override
+        public void run() {
+
+            // do not allow to proceed without leadership
+            NodeId leaderNodeId = leadershipService.getLeader(SUBSCRIBER_TOPIC);
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                log.debug("Not a Leader so cannot consume event");
+                return;
+            }
+
+            try {
+                OnosEvent onosEvent = kafkaStore.consumeEvent();
+
+                if (onosEvent != null) {
+                    kafkaPublisher.send(new ProducerRecord<>(onosEvent.type().toString(),
+                                                            onosEvent.subject())).get();
+
+                    log.info("Event Type - {}, Subject {} sent successfully.",
+                             onosEvent.type(), onosEvent.subject());
+                }
+            } catch (InterruptedException e1) {
+                log.error("Thread interupted");
+                Thread.currentThread().interrupt();
+            } catch (ExecutionException e2) {
+                log.error("Cannot publish data to Kafka - {}", e2);
+            }
+        }
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
new file mode 100644
index 0000000..79d7baf
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.kafka;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+
+import java.util.Dictionary;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.kafkaintegration.api.KafkaConfigService;
+import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
+import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(immediate = true)
+@Service
+public class KafkaConfigManager implements KafkaConfigService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService componentConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected KafkaPublisherAdminService kafkaPublisherAdminService;
+
+    public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+    private String kafkaServerIp =
+            BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
+    private String kafkaServerPortNum =
+            BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
+                                        BOOTSTRAP_SERVERS.length());
+
+    private static final int RETRIES = 1;
+    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
+    private static final int REQUEST_REQUIRED_ACKS = 1;
+    private static final String KEY_SERIALIZER =
+            "org.apache.kafka.common.serialization.StringSerializer";
+    private static final String VALUE_SERIALIZER =
+            "org.apache.kafka.common.serialization.ByteArraySerializer";
+
+    @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
+            label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
+    protected String bootstrapServers = BOOTSTRAP_SERVERS;
+
+    @Property(name = "retries", intValue = RETRIES,
+            label = "Number of times the producer can retry to send after first failure")
+    protected int retries = RETRIES;
+
+    @Property(name = "max.in.flight.requests.per.connection",
+            intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+            label = "The maximum number of unacknowledged requests the client will send before blocking")
+    protected int maxInFlightRequestsPerConnection =
+            MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+
+    @Property(name = "request.required.acks", intValue = 1,
+            label = "Producer will get an acknowledgement after the leader has replicated the data")
+    protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+
+    @Property(name = "key.serializer", value = KEY_SERIALIZER,
+            label = "Serializer class for key that implements the Serializer interface.")
+    protected String keySerializer = KEY_SERIALIZER;
+
+    @Property(name = "value.serializer", value = VALUE_SERIALIZER,
+            label = "Serializer class for value that implements the Serializer interface.")
+    protected String valueSerializer = VALUE_SERIALIZER;
+
+    @Activate
+    protected void activate(ComponentContext context) {
+        componentConfigService.registerProperties(getClass());
+        kafkaPublisherAdminService.start(getConfigParams());
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        componentConfigService.unregisterProperties(getClass(), false);
+        kafkaPublisherAdminService.stop();
+        log.info("Stopped");
+    }
+
+    @Modified
+    private void modified(ComponentContext context) {
+        if (context == null) {
+            bootstrapServers = BOOTSTRAP_SERVERS;
+            retries = RETRIES;
+            maxInFlightRequestsPerConnection =
+                    MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+            requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+            keySerializer = KEY_SERIALIZER;
+            valueSerializer = VALUE_SERIALIZER;
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+
+        String newBootstrapServers;
+        int newRetries;
+        int newMaxInFlightRequestsPerConnection;
+        int newRequestRequiredAcks;
+        try {
+            String s = get(properties, "bootstrap.servers");
+            newBootstrapServers =
+                    isNullOrEmpty(s) ? bootstrapServers : s.trim();
+
+            s = get(properties, "retries");
+            newRetries =
+                    isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
+
+            s = get(properties, "max.in.flight.requests.per.connection");
+            newMaxInFlightRequestsPerConnection =
+                    isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
+                                     : Integer.parseInt(s.trim());
+
+            s = get(properties, "request.required.acks");
+            newRequestRequiredAcks =
+                    isNullOrEmpty(s) ? requestRequiredAcks
+                                     : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            return;
+        }
+
+        if (configModified(newBootstrapServers, newRetries,
+                           newMaxInFlightRequestsPerConnection,
+                           newRequestRequiredAcks)) {
+            bootstrapServers = newBootstrapServers;
+            kafkaServerIp = bootstrapServers
+                    .substring(0, bootstrapServers.indexOf(":"));
+            kafkaServerPortNum = bootstrapServers
+                    .substring(bootstrapServers.indexOf(":") + 1,
+                               bootstrapServers.length());
+
+            retries = newRetries;
+
+            maxInFlightRequestsPerConnection =
+                    newMaxInFlightRequestsPerConnection;
+
+            requestRequiredAcks = newRequestRequiredAcks;
+
+            kafkaPublisherAdminService.restart(KafkaServerConfig.builder()
+                    .ipAddress(kafkaServerIp).port(kafkaServerPortNum)
+                    .numOfRetries(retries)
+                    .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
+                    .acksRequired(requestRequiredAcks)
+                    .keySerializer(keySerializer)
+                    .valueSerializer(valueSerializer).build());
+
+            log.info("Kafka Server Config has been Modified - "
+                    + "bootstrapServers {}, retries {}, "
+                    + "maxInFlightRequestsPerConnection {}, "
+                    + "requestRequiredAcks {}", bootstrapServers, retries,
+                     maxInFlightRequestsPerConnection, requestRequiredAcks);
+        } else {
+            return;
+        }
+    }
+
+    private boolean configModified(String newBootstrapServers, int newRetries,
+                                   int newMaxInFlightRequestsPerConnection,
+                                   int newRequestRequiredAcks) {
+
+        return !newBootstrapServers.equals(bootstrapServers)
+                || newRetries != retries
+                || newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection
+                || newRequestRequiredAcks != requestRequiredAcks;
+
+    }
+
+    @Override
+    public KafkaServerConfig getConfigParams() {
+        String ipAddr =
+                bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
+        String port =
+                bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
+                                           bootstrapServers.length());
+
+        return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
+                .numOfRetries(retries)
+                .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
+                .acksRequired(requestRequiredAcks).keySerializer(keySerializer)
+                .valueSerializer(valueSerializer).build();
+
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java
new file mode 100644
index 0000000..b3064b1f
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/PublishManager.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
+import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
+import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Kafka Producer.
+ */
+@Component
+@Service
+public class PublishManager implements KafkaPublisherService, KafkaPublisherAdminService {
+    private KafkaProducer<String, byte[]> kafkaProducer = null;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Activate
+    protected void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        stop();
+        log.info("Stopped");
+    }
+
+    @Override
+    public void start(KafkaServerConfig config) {
+
+        if (kafkaProducer != null) {
+            log.info("Producer has already started");
+            return;
+        }
+
+        String bootstrapServer =
+                new StringBuilder().append(config.getIpAddress()).append(":")
+                        .append(config.getPort()).toString();
+
+        // Set Server Properties
+        Properties prop = new Properties();
+        prop.put("bootstrap.servers", bootstrapServer);
+        prop.put("retries", config.getNumOfRetries());
+        prop.put("max.in.flight.requests.per.connection",
+                 config.getMaxInFlightRequestsPerConnection());
+        prop.put("request.required.acks", config.getAcksRequired());
+        prop.put("key.serializer", config.getKeySerializer());
+        prop.put("value.serializer", config.getValueSerializer());
+
+        kafkaProducer = new KafkaProducer<>(prop);
+        log.info("Kafka Producer has started.");
+    }
+
+    @Override
+    public void stop() {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer = null;
+        }
+
+        log.info("Kafka Producer has Stopped");
+    }
+
+    @Override
+    public void restart(KafkaServerConfig config) {
+        stop();
+        start(config);
+    }
+
+    @Override
+    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
+        return kafkaProducer.send(record);
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
new file mode 100644
index 0000000..1d4d62b
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * API implementation classes.
+ */
+package org.onosproject.kafkaintegration.kafka;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
new file mode 100644
index 0000000..155a250
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
+import org.onosproject.rest.AbstractWebResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+
+/**
+ * Rest Interfaces for subscribing/unsubscribing to event notifications.
+ */
+@Path("kafkaService")
+public class EventExporterWebResource extends AbstractWebResource {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    public static final String JSON_NOT_NULL =
+            "Registration Data cannot be empty";
+    public static final String REGISTRATION_SUCCESSFUL =
+            "Registered Listener successfully";
+    public static final String DEREGISTRATION_SUCCESSFUL =
+            "De-Registered Listener successfully";
+    public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
+            "Event Registration successful";
+    public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
+            "Event subscription unsuccessful";
+    public static final String EVENT_SUBSCRIPTION_REMOVED =
+            "Event De-Registration successful";
+
+    /**
+     * Registers a listener for ONOS Events.
+     *
+     * @param appName The application trying to register
+     * @return 200 OK with UUID string which should be used as Kafka Consumer
+     *         Group Id and Kafka Server, port information.
+     * @onos.rsModel KafkaRegistration
+     */
+    @POST
+    @Produces(MediaType.APPLICATION_JSON)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Path("register")
+    public Response registerKafkaListener(String appName) {
+
+        EventSubscriptionService service = get(EventSubscriptionService.class);
+
+        RegistrationResponse response = service.registerListener(appName);
+
+        ObjectNode result = mapper().createObjectNode();
+        result.put("groupId", response.getGroupId().getId().toString());
+        result.put("ipAddress", response.getIpAddress());
+        result.put("port", response.getPort());
+
+        log.info("Registered app {}", appName);
+
+        return ok(result.toString()).build();
+    }
+
+    /**
+     * Unregisters a listener for ONOS Events.
+     *
+     * @param appName The application trying to unregister
+     * @return 200 OK
+     * @onos.rsModel KafkaRegistration
+     */
+    @DELETE
+    @Path("unregister")
+    public Response removeKafkaListener(String appName) {
+        EventSubscriptionService service = get(EventSubscriptionService.class);
+
+        service.unregisterListener(appName);
+        log.info("Unregistered app {}", appName);
+        return ok(DEREGISTRATION_SUCCESSFUL).build();
+    }
+
+    /**
+     * Creates subscription to a specific ONOS event.
+     *
+     * @param input Subscription Data in JSON format
+     * @return 200 OK if successful or 400 BAD REQUEST
+     * @onos.rsModel KafkaSubscription
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("subscribe")
+    public Response subscribe(InputStream input) {
+
+        EventSubscriptionService service = get(EventSubscriptionService.class);
+
+        try {
+            EventSubscriber sub = parseSubscriptionData(input);
+            service.subscribe(sub);
+            // It will subscribe to all the topics. Not only the one that is sent by the consumer.
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
+        }
+
+        return ok(EVENT_SUBSCRIPTION_SUCCESSFUL).build();
+    }
+
+    /**
+     * Parses JSON Subscription Data from the external application.
+     *
+     * @param input Subscription Data in JSON format
+     * @return parsed DTO object
+     * @throws IOException
+     */
+    private EventSubscriber parseSubscriptionData(InputStream input)
+            throws IOException {
+
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode node = (ObjectNode) mapper.readTree(input);
+        checkNotNull(node, JSON_NOT_NULL);
+        EventSubscriber codec = codec(EventSubscriber.class).decode(node, this);
+        checkNotNull(codec, JSON_NOT_NULL);
+        return codec;
+    }
+
+    /**
+     * Deletes subscription from a specific ONOS event.
+     *
+     * @param input data in JSON format
+     * @return 200 OK if successful or 400 BAD REQUEST
+     * @onos.rsModel KafkaSubscription
+     */
+    @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Path("unsubscribe")
+    public Response unsubscribe(InputStream input) {
+
+        EventSubscriptionService service = get(EventSubscriptionService.class);
+
+        try {
+            EventSubscriber sub = parseSubscriptionData(input);
+            service.unsubscribe(sub);
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
+        }
+
+        return ok(EVENT_SUBSCRIPTION_REMOVED).build();
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
new file mode 100644
index 0000000..dde0ebe
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * REST API Definitions.
+ *
+ */
+package org.onosproject.kafkaintegration.rest;
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
new file mode 100644
index 0000000..a81fd11
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
@@ -0,0 +1,5 @@
+{
+  "type": "string",
+  "title": "KafkaRegistration",
+  "example": "forwardingApp"
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
new file mode 100644
index 0000000..819ca77
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
@@ -0,0 +1,23 @@
+{
+  "type": "object",
+  "title": "KafkaSubscription",
+  "required": [
+    "appName",
+    "groupId",
+    "eventType"
+  ],
+  "properties": {
+     "appName": {
+        "type": "string",
+        "example": "forwardingApp"
+     },
+     "groupId": {
+        "type": "string",
+        "example": "18285435-2c62-4684-96dd-fb03b7cd0c83"
+     },
+     "eventType": {
+        "type": "string",
+        "example": "DEVICE"
+     }
+  }
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml b/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..3ad1602
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
+         xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         id="ONOS" version="2.5">
+    <display-name>Event Exporter REST API</display-name>
+
+    <servlet>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
+        <init-param>
+        <param-name>jersey.config.server.provider.classnames</param-name>
+        <param-value>org.onosproject.kafkaintegration.rest.EventExporterWebResource</param-value>
+        </init-param>
+    <load-on-startup>10</load-on-startup>
+    </servlet>
+
+    <servlet-mapping>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <url-pattern>/*</url-pattern>
+    </servlet-mapping>
+
+</web-app>