Made changes as per comments.
kafkaProducer is now non-static.

TODO: KafkaPublisherManager Service and not Singleton.
Kafka event publishing.

Change-Id: I5ec20a6e4950c38e822468d343521ab77475b7d3
diff --git a/apps/kafka-integration/api/pom.xml b/apps/kafka-integration/api/pom.xml
index a5aff84..d8386be 100644
--- a/apps/kafka-integration/api/pom.xml
+++ b/apps/kafka-integration/api/pom.xml
@@ -38,6 +38,7 @@
             <artifactId>protobuf-java</artifactId>
             <version>3.0.0-beta-2</version>
         </dependency>
+
     </dependencies>
 
     <build>
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventConversionService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventConversionService.java
new file mode 100644
index 0000000..4c73d96
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventConversionService.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.api;
+
+import org.onosproject.event.Event;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+
+/**
+ * API for conversion of various ONOS events to Protobuf.
+ *
+ */
+public interface EventConversionService {
+    OnosEvent convertEvent(Event<?, ?> event);
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
similarity index 85%
rename from apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
index e203697..07b03de 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
@@ -16,16 +16,19 @@
 
 import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
 import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
 
 import com.google.common.annotations.Beta;
 
+import java.util.List;
+
 /**
  * APIs for subscribing to Onos Event Messages.
  */
 @Beta
-public interface EventExporterService {
+public interface EventSubscriptionService {
 
     /**
      * Registers the external application to receive events generated in ONOS.
@@ -61,4 +64,12 @@
      */
     void unsubscribe(EventSubscriber subscriber)
             throws InvalidGroupIdException, InvalidApplicationException;
+
+    /**
+     * Returns the event subscriber for various event types.
+     *
+     * @param type ONOS event type.
+     * @return List of event subscribers
+     */
+    List<EventSubscriber> getEventSubscribers(Type type);
 }
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java
new file mode 100644
index 0000000..b92b890
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.api;
+
+import com.google.protobuf.GeneratedMessage;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+/**
+ * API for dispatching ONOS events.
+ */
+public interface KafkaPublisherService {
+
+    /**
+     * Publish the ONOS Event to all listeners.
+     *
+     * @param eventType the ONOS eventtype
+     * @param message generated Protocol buffer message from ONOS event data
+     */
+    void publish(Type eventType, GeneratedMessage message);
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
index ff1d27e..2d51c7c 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
@@ -19,7 +19,7 @@
 import com.google.protobuf.GeneratedMessage;
 
 /**
- * Represents the converted Onos Event data into GPB format.
+ * Represents the converted Onos Event data into protobuf format.
  *
  */
 public class OnosEvent extends AbstractEvent<OnosEvent.Type, GeneratedMessage> {
@@ -38,6 +38,19 @@
      * List of Event Types supported.
      */
     public enum Type {
-        DEVICE, LINK;
+        /**
+         * Signifies Device events.
+         */
+        DEVICE("DEVICE"),
+
+        /**
+         * Signifies Link events.
+         */
+        LINK("LINK");
+        public String typeName;
+
+        Type(String name) {
+            typeName = name;
+        }
     }
 }
diff --git a/apps/kafka-integration/app/app.xml b/apps/kafka-integration/app/app.xml
index 5912d4b..a832936 100644
--- a/apps/kafka-integration/app/app.xml
+++ b/apps/kafka-integration/app/app.xml
@@ -19,6 +19,9 @@
      featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
      features="${project.artifactId}" apps="org.onosproject.incubator.protobuf">
     <description>${project.description}</description>
-    <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+
     <artifact>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</artifact>
+    <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</artifact>
 </app>
diff --git a/apps/kafka-integration/app/features.xml b/apps/kafka-integration/app/features.xml
index 1e219b0..2982029 100644
--- a/apps/kafka-integration/app/features.xml
+++ b/apps/kafka-integration/app/features.xml
@@ -19,6 +19,8 @@
              description="${project.description}">
         <feature>onos-api</feature>
         <bundle>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</bundle>
-        <bundle>mvn:${project.groupId}/onos-app-kafka/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</bundle>
+        <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</bundle>
     </feature>
 </features>
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
index 1dafb0d..5a6e574 100644
--- a/apps/kafka-integration/app/pom.xml
+++ b/apps/kafka-integration/app/pom.xml
@@ -26,12 +26,6 @@
 
     <artifactId>onos-app-kafka</artifactId>
 
-    <packaging>bundle</packaging>
-    <description>
-          Kafka Integration Application.
-          This will export ONOS Events to Northbound Kafka Server.
-    </description>
-
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <onos.version>${project.version}</onos.version>
@@ -41,21 +35,18 @@
         <web.context>/onos/kafka</web.context>
         <api.version>1.0.0</api.version>
         <api.package>org.onosproject.kafkaintegration.rest</api.package>
-        <api.title>Kafka Integration Application REST API</api.title>
-        <api.description>
-               APIs for subscribing to Events generated by ONOS
-        </api.description>
         <onos.app.category>Utility</onos.app.category>
         <onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
-        <onos.app.readme>Export ONOS events to a Northbound Kafka server</onos.app.readme>
         <onos.app.requires>org.onosproject.incubator.protobuf</onos.app.requires>
     </properties>
 
-       <dependencies>
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-        </dependency>
+    <packaging>pom</packaging>
+    <description>
+        Kafka Integration Application.
+        This will export ONOS Events to Northbound Kafka Server.
+    </description>
+
+    <dependencies>
 
         <dependency>
             <groupId>org.onosproject</groupId>
@@ -65,168 +56,22 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onos-incubator-protobuf</artifactId>
+            <artifactId>onos-app-kafka-core</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onlab-osgi</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-rest</artifactId>
+            <artifactId>onos-app-kafka-web</artifactId>
             <version>${project.version}</version>
         </dependency>
 
+        <!--Also need to update the app.xml and the features.xml -->
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+            <version>0.8.2.2_1</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-            <scope>test</scope>
-            <classifier>tests</classifier>
-        </dependency>
-
-        <dependency>
-            <groupId>javax.ws.rs</groupId>
-            <artifactId>javax.ws.rs-api</artifactId>
-            <version>2.0.1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>3.0.0-beta-2</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.codehaus.jackson</groupId>
-            <artifactId>jackson-core-asl</artifactId>
-            <version>1.9.13</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.codehaus.jackson</groupId>
-            <artifactId>jackson-mapper-asl</artifactId>
-            <version>1.9.13</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId>
-            <artifactId>jersey-container-servlet</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-annotations</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-core-serializers</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.felix</groupId>
-            <artifactId>org.apache.felix.scr.annotations</artifactId>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <extensions>true</extensions>
-                <configuration>
-                    <instructions>
-                    <Bundle-SymbolicName>
-                        ${project.groupId}.${project.artifactId}
-                    </Bundle-SymbolicName>
-                    <_wab>src/main/webapp/</_wab>
-                    <Include-Resource>
-                        WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
-                                    {maven-resources}
-                    </Include-Resource>
-                    <Import-Package>
-                        org.slf4j,
-                        org.osgi.framework,
-                        javax.ws.rs,
-                        javax.ws.rs.core,
-                        org.glassfish.jersey.servlet,
-                        com.fasterxml.jackson.databind,
-                        com.fasterxml.jackson.databind.node,
-                        com.fasterxml.jackson.core,
-                        org.onlab.packet.*,
-                        org.onosproject.*,
-                        org.onlab.util.*,
-                        com.google.common.*,
-                        com.google.protobuf.*
-                    </Import-Package>
-                    <Web-ContextPath>${web.context}</Web-ContextPath>
-                </instructions>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-scr-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>generate-scr-srcdescriptor</id>
-                        <goals>
-                            <goal>scr</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <supportedProjectTypes>
-                        <supportedProjectType>bundle</supportedProjectType>
-                        <supportedProjectType>war</supportedProjectType>
-                    </supportedProjectTypes>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.onosproject</groupId>
-                <artifactId>onos-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>cfg</id>
-                        <phase>generate-resources</phase>
-                        <goals>
-                            <goal>cfg</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>swagger</id>
-                        <goals>
-                            <goal>swagger</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>app</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>app</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/core/pom.xml b/apps/kafka-integration/core/pom.xml
new file mode 100644
index 0000000..44df376
--- /dev/null
+++ b/apps/kafka-integration/core/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.onosproject</groupId>
+        <artifactId>onos-kafka</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>onos-app-kafka-core</artifactId>
+
+    <packaging>bundle</packaging>
+    <description>
+        Kafka Integration Application.
+        This module is exclusive of REST calls and is only for the implementation of Apache Kafka.
+    </description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-app-kafka-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+            <version>4.3.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+            <version>0.8.2.2_1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-incubator-protobuf</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.0.0-beta-2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-app-kafka-web</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+            <dependency>
+                <groupId>org.osgi</groupId>
+                <artifactId>org.osgi.compendium</artifactId>
+            </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-scr-srcdescriptor</id>
+                        <goals>
+                            <goal>scr</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <supportedProjectTypes>
+                        <supportedProjectType>bundle</supportedProjectType>
+                        <supportedProjectType>war</supportedProjectType>
+                    </supportedProjectTypes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
similarity index 96%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
index 8f69a20..26b5060 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
@@ -14,13 +14,13 @@
  */
 package org.onosproject.kafkaintegration.converter;
 
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
 
 /**
  * Returns the appropriate converter object based on the ONOS event type.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
similarity index 90%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
index ac04040..6b2ee24 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
@@ -14,6 +14,7 @@
  */
 package org.onosproject.kafkaintegration.converter;
 
+import com.google.protobuf.GeneratedMessage;
 import org.onosproject.event.Event;
 import org.onosproject.grpc.net.Device.DeviceCore;
 import org.onosproject.grpc.net.Device.DeviceType;
@@ -25,13 +26,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.GeneratedMessage;
-
 /**
- * Converts ONOS Device event message to GPB format.
- *
+ * Converts ONOS Device event message to protobuf format.
  */
-class DeviceEventConverter implements EventConverter {
+public class DeviceEventConverter implements EventConverter {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -73,14 +71,14 @@
         DeviceCore deviceCore =
                 DeviceCore.newBuilder()
                         .setChassisId(deviceEvent.subject().chassisId().id()
-                                .toString())
+                                              .toString())
                         .setDeviceId(deviceEvent.subject().id().toString())
                         .setHwVersion(deviceEvent.subject().hwVersion())
                         .setManufacturer(deviceEvent.subject().manufacturer())
                         .setSerialNumber(deviceEvent.subject().serialNumber())
                         .setSwVersion(deviceEvent.subject().swVersion())
                         .setType(DeviceType
-                                .valueOf(deviceEvent.subject().type().name()))
+                                         .valueOf(deviceEvent.subject().type().name()))
                         .build();
 
         PortCore portCore = null;
@@ -89,10 +87,10 @@
                     PortCore.newBuilder()
                             .setIsEnabled(deviceEvent.port().isEnabled())
                             .setPortNumber(deviceEvent.port().number()
-                                    .toString())
+                                                   .toString())
                             .setPortSpeed(deviceEvent.port().portSpeed())
                             .setType(PortType
-                                    .valueOf(deviceEvent.port().type().name()))
+                                             .valueOf(deviceEvent.port().type().name()))
                             .build();
 
             notificationBuilder.setPort(portCore);
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
similarity index 83%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
index e15ebdc..c1e7739 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
@@ -20,7 +20,7 @@
 
 /**
  *
- * APIs for converting between ONOS event objects and GPB data objects.
+ * APIs for converting between ONOS event objects and protobuf data objects.
  *
  */
 public interface EventConverter {
@@ -30,7 +30,7 @@
      * to Kafka.
      *
      * @param event ONOS Event object
-     * @return converted data in GPB format.
+     * @return converted data in protobuf format.
      */
-    public GeneratedMessage convertToProtoMessage(Event<?, ?> event);
+    GeneratedMessage convertToProtoMessage(Event<?, ?> event);
 }
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
similarity index 93%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
index a1db209..febe020 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
@@ -28,10 +28,9 @@
 import com.google.protobuf.GeneratedMessage;
 
 /**
- * Converts for ONOS Link event message to GPB format.
- *
+ * Converts for ONOS Link event message to protobuf format.
  */
-class LinkEventConverter implements EventConverter {
+public class LinkEventConverter implements EventConverter {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -41,7 +40,7 @@
         LinkEvent linkEvent = (LinkEvent) event;
 
         if (!linkEventTypeSupported(linkEvent)) {
-            log.error("Unsupported Onos Event {}. There is no matching"
+            log.error("Unsupported Onos Event {}. There is no matching "
                     + "proto Event type", linkEvent.type().toString());
             return null;
         }
@@ -56,7 +55,6 @@
                 return true;
             }
         }
-
         return false;
     }
 
@@ -66,8 +64,7 @@
                 .setLink(LinkCore.newBuilder()
                         .setState(LinkState
                                 .valueOf(linkEvent.subject().state().name()))
-                        .setType(LinkType
-                                .valueOf(linkEvent.subject().type().name()))
+                        .setType(LinkType.valueOf(linkEvent.subject().type().name()))
                         .setDst(ConnectPoint.newBuilder()
                                 .setDeviceId(linkEvent.subject().dst()
                                         .deviceId().toString())
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
new file mode 100644
index 0000000..f1d0c97
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.event.Event;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.converter.LinkEventConverter;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Implementation of Event Conversion Service.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventConversionManager implements EventConversionService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private EventConverter deviceEventConverter;
+    private EventConverter linkEventConverter;
+
+    @Activate
+    protected void activate() {
+        deviceEventConverter = new DeviceEventConverter();
+        linkEventConverter = new LinkEventConverter();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public OnosEvent convertEvent(Event<?, ?> event) {
+        if (event instanceof DeviceEvent) {
+            return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event));
+        } else if (event instanceof LinkEvent) {
+            return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event));
+        } else {
+            throw new IllegalArgumentException("Unsupported event type");
+        }
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
similarity index 94%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
index e2e8e57..ebbac6b 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Copyright 2016-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,18 +15,7 @@
  */
 package org.onosproject.kafkaintegration.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -35,7 +24,9 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.kafkaintegration.api.EventExporterService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
@@ -51,13 +42,22 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
 /**
- * Implementation of Event Exporter Service.
+ * Implementation of Event Subscription Manager.
  *
  */
 @Component(immediate = true)
 @Service
-public class EventExporterManager implements EventExporterService {
+public class EventSubscriptionManager implements EventSubscriptionService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -147,15 +147,12 @@
                     + "registered to make this request.");
         }
 
-        if (!validGroupId(subscriber.subscriberGroupId(),
-                          subscriber.appName())) {
+        if (!validGroupId(subscriber.subscriberGroupId(), subscriber.appName())) {
             throw new InvalidGroupIdException("Incorrect group id in the request");
         }
 
         OnosEventListener onosListener = getListener(subscriber.eventType());
-        checkNotNull(onosListener,
-                     "No listener for the supported event type - {}",
-                     subscriber.eventType());
+        checkNotNull(onosListener, "No listener for the supported event type - {}", subscriber.eventType());
 
         applyListenerAction(subscriber.eventType(), onosListener,
                             ListenerAction.START);
@@ -169,7 +166,7 @@
         subscriptionList.add(subscriber);
         subscriptions.put(subscriber.eventType(), subscriptionList);
 
-        log.info("Subscription for {} event by {} successfull",
+        log.info("Subscription for {} event by {} successful",
                  subscriber.eventType(), subscriber.appName());
     }
 
@@ -308,6 +305,12 @@
                  subscriber.eventType());
     }
 
+    @Override
+    public List<EventSubscriber> getEventSubscribers(Type type) {
+        return subscriptions.getOrDefault(type, ImmutableList.of());
+
+    }
+
     /**
      * Checks if the subscriber has already subscribed to the requested event
      * type.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
similarity index 96%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
index 47098fa..22705d5 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
@@ -21,7 +21,6 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.codec.CodecService;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.rest.SubscriberCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
similarity index 79%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
index d211458..104619a 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
@@ -16,6 +16,7 @@
 
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.kafkaintegration.api.ExportableEventListener;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 import org.slf4j.Logger;
@@ -27,17 +28,19 @@
  * Dispatch ONOS Events to all interested Listeners.
  *
  */
-public final class Dispatcher
-        extends AbstractListenerManager<OnosEvent, ExportableEventListener> {
+
+public final class KafkaPublisherManager
+        extends AbstractListenerManager<OnosEvent, ExportableEventListener> implements KafkaPublisherService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     // Exists to defeat instantiation
-    private Dispatcher() {
+    private KafkaPublisherManager() {
     }
 
+    //TODO: If possible, get rid of Singleton implementation.
     private static class SingletonHolder {
-        private static final Dispatcher INSTANCE = new Dispatcher();
+        private static final KafkaPublisherManager INSTANCE = new KafkaPublisherManager();
     }
 
     /**
@@ -45,16 +48,11 @@
      *
      * @return singleton object
      */
-    public static Dispatcher getInstance() {
+    public static KafkaPublisherManager getInstance() {
         return SingletonHolder.INSTANCE;
     }
 
-    /**
-     * Publish the ONOS Event to all listeners.
-     *
-     * @param eventType the ONOS eventtype
-     * @param message generated Protocol buffer message from ONOS event data
-     */
+    @Override
     public void publish(Type eventType, GeneratedMessage message) {
         log.debug("Dispatching ONOS Event {}", eventType);
         post(new OnosEvent(eventType, message));
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
similarity index 99%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
index 4b9035e..64af4e8 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
@@ -14,11 +14,6 @@
  */
 package org.onosproject.kafkaintegration.impl;
 
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -31,6 +26,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 @Component(immediate = true)
 public class KafkaStorageManager implements KafkaEventStorageService {
 
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
similarity index 97%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
index 2876ca1..3489f6f 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
@@ -12,19 +12,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.kafkaintegration.rest;
+package org.onosproject.kafkaintegration.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.UUID;
-
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.onosproject.codec.CodecContext;
 import org.onosproject.codec.JsonCodec;
 import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
 import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Codec for encoding/decoding a Subscriber object to/from JSON.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java
new file mode 100644
index 0000000..6e7291b
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java
@@ -0,0 +1,242 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Dictionary;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Encapsulates the behavior of monitoring various ONOS events.
+ * */
+@Component(immediate = true)
+public class EventMonitor {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventSubscriptionService eventSubscriptionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventConversionService eventConversionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService componentConfigService;
+
+    private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final LinkListener linkListener = new InternalLinkListener();
+
+    protected ExecutorService eventExecutor;
+
+    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+    private static final int RETRIES = 1;
+    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
+    private static final int REQUEST_REQUIRED_ACKS = 1;
+    private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+    private static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+
+    @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
+            label = "Default host/post pair to establish initial connection to Kafka cluster.")
+    private String bootstrapServers = BOOTSTRAP_SERVERS;
+
+    @Property(name = "retries", intValue = RETRIES,
+            label = "Number of times the producer can retry to send after first failure")
+    private int retries = RETRIES;
+
+    @Property(name = "max.in.flight.requests.per.connection", intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+            label = "The maximum number of unacknowledged requests the client will send before blocking")
+    private int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+
+    @Property(name = "request.required.acks", intValue = 1,
+            label = "Producer will get an acknowledgement after the leader has replicated the data")
+    private int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+
+    @Property(name = "key.serializer", value = KEY_SERIALIZER,
+            label = "Serializer class for key that implements the Serializer interface.")
+    private String keySerializer = KEY_SERIALIZER;
+
+    @Property(name = "value.serializer", value = VALUE_SERIALIZER,
+            label = "Serializer class for value that implements the Serializer interface.")
+    private String valueSerializer = VALUE_SERIALIZER;
+
+    private Producer producer;
+
+    @Activate
+    protected void activate(ComponentContext context) {
+        componentConfigService.registerProperties(getClass());
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
+        deviceService.addListener(deviceListener);
+        linkService.addListener(linkListener);
+        producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
+                                requestRequiredAcks, keySerializer, valueSerializer);
+        producer.start();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        componentConfigService.unregisterProperties(getClass(), false);
+        deviceService.removeListener(deviceListener);
+        linkService.removeListener(linkListener);
+        producer.stop();
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
+
+        log.info("Stopped");
+    }
+
+    @Modified
+    private void modified(ComponentContext context) {
+        if (context == null) {
+            bootstrapServers = BOOTSTRAP_SERVERS;
+            retries = RETRIES;
+            maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+            requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+            keySerializer = KEY_SERIALIZER;
+            valueSerializer = VALUE_SERIALIZER;
+            return;
+        }
+        Dictionary properties = context.getProperties();
+
+        String newBootstrapServers = BOOTSTRAP_SERVERS;
+        int newRetries = RETRIES;
+        int newMaxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+        int newRequestRequiredAcks = REQUEST_REQUIRED_ACKS;
+        try {
+            String s = get(properties, "bootstrapServers");
+            newBootstrapServers = isNullOrEmpty(s)
+                    ? bootstrapServers : s.trim();
+
+            s = get(properties, "retries");
+            newRetries = isNullOrEmpty(s)
+                    ? retries : Integer.parseInt(s.trim());
+
+            s = get(properties, "maxInFlightRequestsPerConnection");
+            newMaxInFlightRequestsPerConnection = isNullOrEmpty(s)
+                    ? maxInFlightRequestsPerConnection : Integer.parseInt(s.trim());
+
+            s = get(properties, "requestRequiredAcks");
+            newRequestRequiredAcks = isNullOrEmpty(s)
+                    ? requestRequiredAcks : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            return;
+        }
+
+        boolean modified = newBootstrapServers != bootstrapServers ||
+                newRetries != retries ||
+                newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection ||
+                newRequestRequiredAcks != requestRequiredAcks;
+
+        if (modified) {
+            bootstrapServers = newBootstrapServers;
+            retries = newRetries;
+            maxInFlightRequestsPerConnection = newMaxInFlightRequestsPerConnection;
+            requestRequiredAcks = newRequestRequiredAcks;
+            if (producer != null) {
+                producer.stop();
+            }
+            producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
+                                        requestRequiredAcks, keySerializer, valueSerializer);
+            producer.start();
+            log.info("Modified");
+        } else {
+            return;
+        }
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
+                eventExecutor.execute(() -> {
+                    try {
+                        String id = UUID.randomUUID().toString();
+                        producer.send(new ProducerRecord<>(DEVICE.toString(),
+                                                                id, event.subject().toString().getBytes())).get();
+                        log.debug("Device event sent successfully.");
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    } catch (ExecutionException e) {
+                        log.error("Exception thrown {}", e);
+                    }
+                });
+            } else {
+                log.debug("No device listeners");
+            }
+        }
+    }
+
+    private class InternalLinkListener implements LinkListener {
+
+        @Override
+        public void event(LinkEvent event) {
+            if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
+                eventExecutor.execute(() -> {
+                    try {
+                        String id = UUID.randomUUID().toString();
+                        producer.send(new ProducerRecord<>(LINK.toString(),
+                                                                id, event.subject().toString().getBytes())).get();
+                        log.debug("Link event sent successfully.");
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    } catch (ExecutionException e) {
+                        log.error("Exception thrown {}", e);
+                    }
+                });
+            } else {
+                log.debug("No link listeners");
+            }
+        }
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java
new file mode 100644
index 0000000..bbdff81
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+/**
+ * Implementation of Kafka Producer.
+ */
+public class Producer {
+    private KafkaProducer<String, byte[]> kafkaProducer = null;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    Producer(String bootstrapServers, int retries, int maxInFlightRequestsPerConnection,
+             int requestRequiredAcks, String keySerializer, String valueSerializer) {
+
+        Properties prop = new Properties();
+        prop.put("bootstrap.servers", bootstrapServers);
+        prop.put("retries", retries);
+        prop.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
+        prop.put("request.required.acks", requestRequiredAcks);
+        prop.put("key.serializer", keySerializer);
+        prop.put("value.serializer", valueSerializer);
+
+        kafkaProducer = new KafkaProducer<>(prop);
+    }
+
+    public void start() {
+        log.info("Started");
+    }
+
+    public void stop() {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer = null;
+        }
+
+        log.info("Stopped");
+    }
+
+    public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
+        return kafkaProducer.send(record);
+    }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
new file mode 100644
index 0000000..e222a64
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * API implementation classes.
+ */
+package org.onosproject.kafkaintegration.kafka;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
similarity index 87%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
index 22c4bf2..e2f23c0 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
@@ -14,29 +14,31 @@
  */
 package org.onosproject.kafkaintegration.listener;
 
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-
+import com.google.protobuf.GeneratedMessage;
 import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 import org.onosproject.kafkaintegration.converter.ConversionFactory;
 import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.GeneratedMessage;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
 
 /**
  * Listens for ONOS Device events.
  *
  */
-final class DeviceEventsListener implements OnosEventListener {
+public final class DeviceEventsListener implements OnosEventListener {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     private boolean listenerRunning = false;
 
     private InnerListener listener = null;
-
     // Exists to defeat instantiation
     private DeviceEventsListener() {
     }
@@ -70,14 +72,14 @@
         @Override
         public void event(DeviceEvent arg0) {
 
-            // Convert the event to GPB format
+            // Convert the event to protobuf format
             ConversionFactory conversionFactory =
                     ConversionFactory.getInstance();
             EventConverter converter = conversionFactory.getConverter(DEVICE);
             GeneratedMessage message = converter.convertToProtoMessage(arg0);
 
             // Call Dispatcher and publish event
-            Dispatcher.getInstance().publish(DEVICE, message);
+            KafkaPublisherManager.getInstance().publish(DEVICE, message);
         }
     }
 
@@ -91,4 +93,4 @@
         }
     }
 
-}
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
similarity index 91%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
index 769d8b4..39ab078 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
@@ -14,29 +14,27 @@
  */
 package org.onosproject.kafkaintegration.listener;
 
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
+import com.google.protobuf.GeneratedMessage;
 import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
 import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 import org.onosproject.kafkaintegration.converter.ConversionFactory;
 import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
 import org.onosproject.net.link.LinkEvent;
 import org.onosproject.net.link.LinkListener;
 import org.onosproject.net.link.LinkService;
 
-import com.google.protobuf.GeneratedMessage;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
 
 /**
  * Listens for ONOS Link Events.
  *
  */
-final class LinkEventsListener implements OnosEventListener {
+public final class LinkEventsListener implements OnosEventListener {
 
     private boolean listenerRunning = false;
 
     private InnerListener listener = null;
-
     // Exists to defeat instantiation
     private LinkEventsListener() {
     }
@@ -70,14 +68,14 @@
         @Override
         public void event(LinkEvent arg0) {
 
-            // Convert the event to GPB format
+            // Convert the event to protobuf format
             ConversionFactory conversionFactory =
                     ConversionFactory.getInstance();
             EventConverter converter = conversionFactory.getConverter(LINK);
             GeneratedMessage message = converter.convertToProtoMessage(arg0);
 
             // Call Dispatcher and publish event
-            Dispatcher.getInstance().publish(LINK, message);
+            KafkaPublisherManager.getInstance().publish(LINK, message);
         }
     }
 
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
similarity index 99%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
index 496ba2b..65b7cf1 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
@@ -14,13 +14,13 @@
  */
 package org.onosproject.kafkaintegration.listener;
 
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
 
 /**
  * Returns the appropriate listener object based on the ONOS event type.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
similarity index 99%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
index 22cde21..ad14104 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
@@ -30,7 +30,6 @@
      * @param service ONOS event listener for the specific event type
      */
     void startListener(Type event, ListenerService<?, ?> service);
-
     /**
      * Stop the Listener for the specific ONOS event type.
      *
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
diff --git a/apps/kafka-integration/pom.xml b/apps/kafka-integration/pom.xml
index 03754ef..287700d 100644
--- a/apps/kafka-integration/pom.xml
+++ b/apps/kafka-integration/pom.xml
@@ -31,8 +31,9 @@
 
     <modules>
         <module>api</module>
+        <module>core</module>
+        <module>web</module>
         <module>app</module>
     </modules>
 
-</project>
-
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/web/pom.xml b/apps/kafka-integration/web/pom.xml
new file mode 100644
index 0000000..bcf768e
--- /dev/null
+++ b/apps/kafka-integration/web/pom.xml
@@ -0,0 +1,219 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.onosproject</groupId>
+        <artifactId>onos-kafka</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>onos-app-kafka-web</artifactId>
+
+    <properties>
+        <web.context>/onos/kafka</web.context>
+        <api.version>1.0.0</api.version>
+        <api.package>org.onosproject.kafkaintegration.rest</api.package>
+        <api.title>Kafka Integration Application REST API</api.title>
+        <api.description>
+            APIs for subscribing to Events generated by ONOS
+        </api.description>
+    </properties>
+
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-app-kafka-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-incubator-protobuf</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-osgi</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-rest</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <version>2.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.0.0-beta-2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>1.9.13</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>1.9.13</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Bundle-SymbolicName>
+                            ${project.groupId}.${project.artifactId}
+                        </Bundle-SymbolicName>
+                        <_wab>src/main/webapp/</_wab>
+                        <Include-Resource>
+                            WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
+                            {maven-resources}
+                        </Include-Resource>
+                        <Import-Package>
+                            org.slf4j,
+                            org.osgi.framework,
+                            javax.ws.rs,
+                            javax.ws.rs.core,
+                            org.glassfish.jersey.servlet,
+                            com.fasterxml.jackson.databind,
+                            com.fasterxml.jackson.databind.node,
+                            com.fasterxml.jackson.core,
+                            org.onlab.packet.*,
+                            org.onosproject.*,
+                            org.onlab.util.*,
+                            com.google.common.*,
+                            com.google.protobuf.*
+                        </Import-Package>
+                        <Web-ContextPath>${web.context}</Web-ContextPath>
+                    </instructions>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-scr-srcdescriptor</id>
+                        <goals>
+                            <goal>scr</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <supportedProjectTypes>
+                        <supportedProjectType>bundle</supportedProjectType>
+                        <supportedProjectType>war</supportedProjectType>
+                    </supportedProjectTypes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.onosproject</groupId>
+                <artifactId>onos-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>cfg</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>cfg</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>swagger</id>
+                        <goals>
+                            <goal>swagger</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>app</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>app</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
similarity index 90%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
rename to apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
index 0456362..0a6e91d 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
+++ b/apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
@@ -14,11 +14,15 @@
  */
 package org.onosproject.kafkaintegration.rest;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.rest.AbstractWebResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -27,16 +31,11 @@
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
 
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.rest.AbstractWebResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
 
 /**
  * Rest Interfaces for subscribing/unsubscribing to event notifications.
@@ -52,11 +51,12 @@
     public static final String DEREGISTRATION_SUCCESSFUL =
             "De-Registered Listener successfully";
     public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
-            "Event Registration successfull";
+            "Event Registration successful";
     public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
             "Event subscription unsuccessful";
     public static final String EVENT_SUBSCRIPTION_REMOVED =
-            "Event De-Registration successfull";
+            "Event De-Registration successful";
+
     /**
      * Registers a listener for ONOS Events.
      *
@@ -71,12 +71,11 @@
     @Path("register")
     public Response registerKafkaListener(String appName) {
 
-        EventExporterService service = get(EventExporterService.class);
+        EventSubscriptionService service = get(EventSubscriptionService.class);
 
         EventSubscriberGroupId groupId = service.registerListener(appName);
 
         log.info("Registered app {}", appName);
-
         // TODO: Should also return Kafka server information.
         // Will glue this in when we have the config and Kafka modules ready
         return ok(groupId.getId().toString()).build();
@@ -92,7 +91,7 @@
     @DELETE
     @Path("unregister")
     public Response removeKafkaListener(String appName) {
-        EventExporterService service = get(EventExporterService.class);
+        EventSubscriptionService service = get(EventSubscriptionService.class);
 
         service.unregisterListener(appName);
         log.info("Unregistered app {}", appName);
@@ -112,11 +111,12 @@
     @Path("subscribe")
     public Response subscribe(InputStream input) {
 
-        EventExporterService service = get(EventExporterService.class);
+        EventSubscriptionService service = get(EventSubscriptionService.class);
 
         try {
             EventSubscriber sub = parseSubscriptionData(input);
             service.subscribe(sub);
+            // It will subscribe to all the topics. Not only the one that is sent by the consumer.
         } catch (Exception e) {
             log.error(e.getMessage());
             return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
@@ -155,7 +155,7 @@
     @Path("unsubscribe")
     public Response unsubscribe(InputStream input) {
 
-        EventExporterService service = get(EventExporterService.class);
+        EventSubscriptionService service = get(EventSubscriptionService.class);
 
         try {
             EventSubscriber sub = parseSubscriptionData(input);
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java b/apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
rename to apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json b/apps/kafka-integration/web/src/main/resources/definitions/KafkaRegistration.json
similarity index 100%
rename from apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
rename to apps/kafka-integration/web/src/main/resources/definitions/KafkaRegistration.json
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json b/apps/kafka-integration/web/src/main/resources/definitions/KafkaSubscription.json
similarity index 100%
rename from apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
rename to apps/kafka-integration/web/src/main/resources/definitions/KafkaSubscription.json
diff --git a/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml b/apps/kafka-integration/web/src/main/webapp/WEB-INF/web.xml
similarity index 100%
rename from apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
rename to apps/kafka-integration/web/src/main/webapp/WEB-INF/web.xml