Kafka Integration Application (Fix Javadoc errors + review comments)

1. Refactored the application into two java projects api and app as per convention
2. Deleted the onos-app-gpb project. The proto files are consolidated in the
   incubator-protobuf project as per suggestions.
3. Some code to translate ONOS Event pojo messages to GPB format.
4. Implementation of Subscribe and Unsubscribe APIs.
5. Minor changes due to review comments from 9212 and 9053
6. Refactored the proto fileso that its a 1:1 mapping between the core type to proto message.

Change-Id: I2bcc0de96150f838ccfe9e49293fe61d94062628
diff --git a/apps/kafka-integration/api/pom.xml b/apps/kafka-integration/api/pom.xml
new file mode 100644
index 0000000..56905f4
--- /dev/null
+++ b/apps/kafka-integration/api/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+              xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>onos-kafka</artifactId>
+        <groupId>org.onosproject</groupId>
+        <version>1.7.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>onos-app-kafka-api</artifactId>
+    <packaging>bundle</packaging>
+
+    <url>http://onosproject.org</url>
+
+    <description>Kafka Integration Application</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.0.0-beta-2</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+       <plugins>
+          <plugin>
+              <groupId>org.apache.felix</groupId>
+              <artifactId>maven-bundle-plugin</artifactId>
+              <extensions>true</extensions>
+              <configuration>
+                  <instructions>
+                  </instructions>
+              </configuration>
+          </plugin>
+       </plugins>
+    </build>
+</project>
+
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
similarity index 90%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
index 3ca5414..e203697 100644
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
@@ -18,11 +18,13 @@
 import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
 import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
 import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
-import org.onosproject.kafkaintegration.errors.UnsupportedEventException;
+
+import com.google.common.annotations.Beta;
 
 /**
  * APIs for subscribing to Onos Event Messages.
  */
+@Beta
 public interface EventExporterService {
 
     /**
@@ -44,13 +46,11 @@
      * Allows registered listener to subscribe for a specific event type.
      *
      * @param subscriber Subscription data containing the event type
-     * @throws UnsupportedEventException
      * @throws InvalidGroupIdException
      * @throws InvalidApplicationException
      */
     void subscribe(EventSubscriber subscriber)
-            throws UnsupportedEventException, InvalidGroupIdException,
-            InvalidApplicationException;
+            throws InvalidGroupIdException, InvalidApplicationException;
 
     /**
      * Allows the registered listener to unsubscribe for a specific event.
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/ExportableEventListener.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/ExportableEventListener.java
new file mode 100644
index 0000000..8aae1b7
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/ExportableEventListener.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api;
+
+import org.onosproject.event.EventListener;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * API for listeners to listen for Events generated by the ONOS event listener.
+ * At present the only listener will be a Kafka Manager or Module whose sole
+ * purpose is to publish the received data to a Kafka message bus.
+ *
+ */
+@Beta
+public interface ExportableEventListener extends EventListener<OnosEvent> {
+
+}
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriber.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriberGroupId.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriberGroupId.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriberGroupId.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/EventSubscriberGroupId.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/package-info.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/package-info.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/dto/package-info.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/package-info.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/package-info.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/package-info.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/api/package-info.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/package-info.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/InvalidApplicationException.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/InvalidApplicationException.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/InvalidApplicationException.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/InvalidApplicationException.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/InvalidGroupIdException.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/InvalidGroupIdException.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/InvalidGroupIdException.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/InvalidGroupIdException.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/UnsupportedEventException.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/UnsupportedEventException.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/UnsupportedEventException.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/UnsupportedEventException.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
similarity index 99%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
index 28427d4..35773da 100644
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
@@ -14,5 +14,6 @@
 
 /**
  * Application specific Exception classes.
+ *
  */
 package org.onosproject.kafkaintegration.errors;
diff --git a/apps/kafka-integration/app/app.xml b/apps/kafka-integration/app/app.xml
new file mode 100644
index 0000000..5912d4b
--- /dev/null
+++ b/apps/kafka-integration/app/app.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<app name="org.onosproject.kafkaintegration" origin="Calix" version="${project.version}"
+     category="Utility" url="http://onosproject.org" title="Kafka Integration Application"
+     featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
+     features="${project.artifactId}" apps="org.onosproject.incubator.protobuf">
+    <description>${project.description}</description>
+    <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</artifact>
+</app>
diff --git a/apps/kafka-integration/app/features.xml b/apps/kafka-integration/app/features.xml
new file mode 100644
index 0000000..1e219b0
--- /dev/null
+++ b/apps/kafka-integration/app/features.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Copyright 2016-present Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
+    <feature name="${project.artifactId}" version="${project.version}"
+             description="${project.description}">
+        <feature>onos-api</feature>
+        <bundle>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/onos-app-kafka/${project.version}</bundle>
+    </feature>
+</features>
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
new file mode 100644
index 0000000..026b410
--- /dev/null
+++ b/apps/kafka-integration/app/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.onosproject</groupId>
+        <artifactId>onos-kafka</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>onos-app-kafka</artifactId>
+
+    <packaging>bundle</packaging>
+    <description>
+          Kafka Integration Application.
+          This will export ONOS Events to Northbound Kafka Server.
+    </description>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <onos.version>${project.version}</onos.version>
+        <onos.app.name>org.onosproject.kafkaintegration</onos.app.name>
+        <onos.app.title>Kafka Integration Application</onos.app.title>
+        <onos.app.origin>Calix, Inc.</onos.app.origin>
+        <web.context>/onos/kafka</web.context>
+        <api.version>1.0.0</api.version>
+        <api.package>org.onosproject.kafkaintegration.rest</api.package>
+        <api.title>Kafka Integration Application REST API</api.title>
+        <api.description>
+               APIs for subscribing to Events generated by ONOS
+        </api.description>
+        <onos.app.category>Utility</onos.app.category>
+        <onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
+        <onos.app.readme>Export ONOS events to a Northbound Kafka server</onos.app.readme>
+        <onos.app.requires>org.onosproject.incubator.protobuf</onos.app.requires>
+    </properties>
+
+       <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-app-kafka-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-incubator-protobuf</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-osgi</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-rest</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <version>2.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.0.0-beta-2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>1.9.13</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>1.9.13</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                    <Bundle-SymbolicName>
+                        ${project.groupId}.${project.artifactId}
+                    </Bundle-SymbolicName>
+                    <_wab>src/main/webapp/</_wab>
+                    <Include-Resource>
+                        WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
+                                    {maven-resources}
+                    </Include-Resource>
+                    <Import-Package>
+                        org.slf4j,
+                        org.osgi.framework,
+                        javax.ws.rs,
+                        javax.ws.rs.core,
+                        org.glassfish.jersey.servlet,
+                        com.fasterxml.jackson.databind,
+                        com.fasterxml.jackson.databind.node,
+                        com.fasterxml.jackson.core,
+                        org.onlab.packet.*,
+                        org.onosproject.*,
+                        com.google.common.*
+                    </Import-Package>
+                    <Web-ContextPath>${web.context}</Web-ContextPath>
+                </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-scr-srcdescriptor</id>
+                        <goals>
+                            <goal>scr</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <supportedProjectTypes>
+                        <supportedProjectType>bundle</supportedProjectType>
+                        <supportedProjectType>war</supportedProjectType>
+                    </supportedProjectTypes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.onosproject</groupId>
+                <artifactId>onos-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>cfg</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>cfg</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>swagger</id>
+                        <goals>
+                            <goal>swagger</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>app</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>app</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
new file mode 100644
index 0000000..8f69a20
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;;
+
+/**
+ * Returns the appropriate converter object based on the ONOS event type.
+ *
+ */
+public final class ConversionFactory {
+
+    // Store converters for all supported events
+    private Map<Type, EventConverter> converters =
+            new HashMap<Type, EventConverter>() {
+                {
+                    put(DEVICE, new DeviceEventConverter());
+                    put(LINK, new LinkEventConverter());
+                }
+            };
+
+    // Exists to defeat instantiation
+    private ConversionFactory() {
+    }
+
+    private static class SingletonHolder {
+        private static final ConversionFactory INSTANCE =
+                new ConversionFactory();
+    }
+
+    /**
+     * Returns a static reference to the Conversion Factory.
+     *
+     * @return singleton object
+     */
+    public static ConversionFactory getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    /**
+     * Returns an Event converter object for the specified ONOS event type.
+     *
+     * @param event ONOS event type
+     * @return Event Converter object
+     */
+    public EventConverter getConverter(Type event) {
+        return converters.get(event);
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
new file mode 100644
index 0000000..e08d6e1
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
@@ -0,0 +1,111 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.event.Event;
+import org.onosproject.grpc.net.Device.DeviceCore;
+import org.onosproject.grpc.net.Device.DeviceType;
+import org.onosproject.grpc.net.DeviceEvent.DeviceEventType;
+import org.onosproject.grpc.net.DeviceEvent.DeviceNotification;
+import org.onosproject.grpc.net.Port.PortCore;
+import org.onosproject.grpc.net.Port.PortType;
+import org.onosproject.net.device.DeviceEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Converts ONOS Device event message to GPB format.
+ *
+ */
+class DeviceEventConverter implements EventConverter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
+
+        DeviceEvent deviceEvent = (DeviceEvent) event;
+
+        if (!deviceEventSubtypeSupported(deviceEvent)) {
+            log.error("Unsupported Onos Device Event {}. There is no matching"
+                    + "proto Device Event type", deviceEvent.type().toString());
+            return null;
+        }
+
+        return buildDeviceProtoMessage(deviceEvent);
+    }
+
+    /**
+     * Checks if the ONOS Device Event type is supported.
+     *
+     * @param event ONOS Device event
+     * @return true if there is a match and false otherwise
+     */
+    private boolean deviceEventSubtypeSupported(DeviceEvent event) {
+        DeviceEventType[] deviceEvents = DeviceEventType.values();
+        for (DeviceEventType deviceEventType : deviceEvents) {
+            if (deviceEventType.name().equals(event.type().name())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private DeviceNotification buildDeviceProtoMessage(DeviceEvent deviceEvent) {
+        DeviceNotification notification = DeviceNotification.newBuilder()
+                .setDeviceEventType(getProtoType(deviceEvent))
+                .setDevice(DeviceCore.newBuilder()
+                        .setChassisId(deviceEvent.subject().chassisId().id()
+                                .toString())
+                        .setDeviceId(deviceEvent.subject().id().toString())
+                        .setHwVersion(deviceEvent.subject().hwVersion())
+                        .setManufacturer(deviceEvent.subject().manufacturer())
+                        .setSerialNumber(deviceEvent.subject().serialNumber())
+                        .setSwVersion(deviceEvent.subject().swVersion())
+                        .setType(DeviceType.valueOf(deviceEvent.type().name()))
+                        .build())
+                .setPort(PortCore.newBuilder()
+                        .setIsEnabled(deviceEvent.port().isEnabled())
+                        .setPortNumber(deviceEvent.port().number().toString())
+                        .setPortSpeed(deviceEvent.port().portSpeed())
+                        .setType(PortType
+                                .valueOf(deviceEvent.port().type().name()))
+                        .build())
+                .build();
+
+        return notification;
+    }
+
+    /**
+     * Retrieves the protobuf generated device event type.
+     *
+     * @param event ONOS Device Event
+     * @return generated Device Event Type
+     */
+    private DeviceEventType getProtoType(DeviceEvent event) {
+        DeviceEventType protobufEventType = null;
+        DeviceEventType[] deviceEvents = DeviceEventType.values();
+        for (DeviceEventType deviceEventType : deviceEvents) {
+            if (deviceEventType.name().equals(event.type().name())) {
+                protobufEventType = deviceEventType;
+            }
+        }
+
+        return protobufEventType;
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
new file mode 100644
index 0000000..e15ebdc
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.event.Event;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ *
+ * APIs for converting between ONOS event objects and GPB data objects.
+ *
+ */
+public interface EventConverter {
+
+    /**
+     * Converts ONOS specific event data to a format that is suitable for export
+     * to Kafka.
+     *
+     * @param event ONOS Event object
+     * @return converted data in GPB format.
+     */
+    public GeneratedMessage convertToProtoMessage(Event<?, ?> event);
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
new file mode 100644
index 0000000..2d65537
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
@@ -0,0 +1,104 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.converter;
+
+import org.onosproject.event.Event;
+import org.onosproject.grpc.net.Link.ConnectPoint;
+import org.onosproject.grpc.net.Link.LinkCore;
+import org.onosproject.grpc.net.Link.LinkState;
+import org.onosproject.grpc.net.Link.LinkType;
+import org.onosproject.grpc.net.LinkEvent.LinkEventType;
+import org.onosproject.grpc.net.LinkEvent.LinkNotification;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Converts for ONOS Link event message to GPB format.
+ *
+ */
+class LinkEventConverter implements EventConverter {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
+
+        LinkEvent linkEvent = (LinkEvent) event;
+
+        if (!linkEventSubtypeSupported(linkEvent)) {
+            log.error("Unsupported Onos Event {}. There is no matching"
+                    + "proto Event type", linkEvent.type().toString());
+            return null;
+        }
+
+        return buildDeviceProtoMessage(linkEvent);
+    }
+
+    private boolean linkEventSubtypeSupported(LinkEvent event) {
+        LinkType[] kafkaLinkEvents = LinkType.values();
+        for (LinkType linkEventType : kafkaLinkEvents) {
+            if (linkEventType.name().equals(event.type().name())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private LinkNotification buildDeviceProtoMessage(LinkEvent linkEvent) {
+        LinkNotification notification = LinkNotification.newBuilder()
+                .setLinkEventType(getProtoType(linkEvent))
+                .setLink(LinkCore.newBuilder()
+                        .setState(LinkState
+                                .valueOf(linkEvent.subject().state().name()))
+                        .setType(LinkType
+                                .valueOf(linkEvent.subject().type().name()))
+                        .setDst(ConnectPoint.newBuilder()
+                                .setDeviceId(linkEvent.subject().dst()
+                                        .deviceId().toString())
+                                .setPortNumber(linkEvent.subject().dst().port()
+                                        .toString()))
+                        .setSrc(ConnectPoint.newBuilder()
+                                .setDeviceId(linkEvent.subject().src()
+                                        .deviceId().toString())
+                                .setPortNumber(linkEvent.subject().src().port()
+                                        .toString())))
+                .build();
+
+        return notification;
+    }
+
+    /**
+     * Returns the specific Kafka Device Event Type for the corresponding ONOS
+     * Device Event Type.
+     *
+     * @param event ONOS Device Event
+     * @return Kafka Device Event Type
+     */
+    private LinkEventType getProtoType(LinkEvent event) {
+        LinkEventType generatedEventType = null;
+        LinkEventType[] kafkaEvents = LinkEventType.values();
+        for (LinkEventType linkEventType : kafkaEvents) {
+            if (linkEventType.name().equals(event.type().name())) {
+                generatedEventType = linkEventType;
+            }
+        }
+
+        return generatedEventType;
+    }
+}
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
similarity index 79%
copy from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
copy to apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
index 28427d4..d33c0d7 100644
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
@@ -13,6 +13,8 @@
  */
 
 /**
- * Application specific Exception classes.
+ * Converters for converting various ONOS events to their corresponding Protocol
+ * Buffer format.
+ *
  */
-package org.onosproject.kafkaintegration.errors;
+package org.onosproject.kafkaintegration.converter;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
new file mode 100644
index 0000000..d211458
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.kafkaintegration.api.ExportableEventListener;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Dispatch ONOS Events to all interested Listeners.
+ *
+ */
+public final class Dispatcher
+        extends AbstractListenerManager<OnosEvent, ExportableEventListener> {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // Exists to defeat instantiation
+    private Dispatcher() {
+    }
+
+    private static class SingletonHolder {
+        private static final Dispatcher INSTANCE = new Dispatcher();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static Dispatcher getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    /**
+     * Publish the ONOS Event to all listeners.
+     *
+     * @param eventType the ONOS eventtype
+     * @param message generated Protocol buffer message from ONOS event data
+     */
+    public void publish(Type eventType, GeneratedMessage message) {
+        log.debug("Dispatching ONOS Event {}", eventType);
+        post(new OnosEvent(eventType, message));
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
new file mode 100644
index 0000000..6e1a0e8
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kafkaintegration.api.EventExporterService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
+import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
+import org.onosproject.kafkaintegration.listener.ListenerFactory;
+import org.onosproject.kafkaintegration.listener.OnosEventListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of Event Exporter Service.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventExporterManager implements EventExporterService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // Stores the currently registered applications for event export service.
+    private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
+
+    private Map<Type, List<EventSubscriber>> subscriptions;
+
+    private static final String REGISTERED_APPS = "registered-applications";
+
+    private static final String SUBSCRIBED_APPS = "event-subscriptions";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ApplicationId appId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService
+                .registerApplication("org.onosproject.kafkaintegration");
+
+        registeredApps = storageService
+                .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
+                .withName(REGISTERED_APPS)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 EventSubscriberGroupId.class,
+                                                 UUID.class))
+                .build().asJavaMap();
+
+        subscriptions = storageService
+                .<Type, List<EventSubscriber>>consistentMapBuilder()
+                .withName(SUBSCRIBED_APPS)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 EventSubscriber.class))
+                .build().asJavaMap();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public EventSubscriberGroupId registerListener(String appName) {
+
+        // TODO: Remove it once ONOS provides a mechanism for external apps
+        // to register with the core service. See Jira - 4409
+        ApplicationId externalAppId = coreService.registerApplication(appName);
+
+        return registeredApps.computeIfAbsent(externalAppId,
+                                              (key) -> new EventSubscriberGroupId(UUID
+                                                      .randomUUID()));
+
+    }
+
+    @Override
+    public void unregisterListener(String appName) {
+        ApplicationId externalAppId =
+                checkNotNull(coreService.getAppId(appName));
+        registeredApps.remove(externalAppId);
+    }
+
+    @Override
+    public void subscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(),
+                          subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        OnosEventListener onosListener = getListener(subscriber.eventType());
+        checkNotNull(onosListener,
+                     "No listener for the supported event type - {}",
+                     subscriber.eventType());
+
+        applyListenerAction(subscriber.eventType(), onosListener,
+                            ListenerAction.START);
+
+        // update internal state
+        List<EventSubscriber> subscriptionList =
+                subscriptions.get(subscriber.eventType());
+        if (subscriptionList == null) {
+            subscriptionList = new ArrayList<EventSubscriber>();
+        }
+        subscriptionList.add(subscriber);
+        subscriptions.put(subscriber.eventType(), subscriptionList);
+
+        log.info("Subscription for {} event by {} successfull",
+                 subscriber.eventType(), subscriber.appName());
+    }
+
+    /**
+     * Checks if the application has registered.
+     *
+     * @param appName application name
+     * @return true if application has registered
+     */
+    private boolean registeredApplication(String appName) {
+
+        checkNotNull(appName);
+        ApplicationId appId = checkNotNull(coreService.getAppId(appName));
+        if (registeredApps.containsKey(appId)) {
+            return true;
+        }
+
+        log.debug("{} is not registered", appName);
+        return false;
+    }
+
+    /**
+     * Actions that can be performed on the ONOS Event Listeners.
+     *
+     */
+    private enum ListenerAction {
+        START, STOP;
+    }
+
+    /**
+     * Applies the specified action on the Listener.
+     *
+     * @param eventType the ONOS Event type registered by the application
+     * @param onosListener ONOS event listener
+     * @param action to be performed on the listener
+     */
+    private void applyListenerAction(Type eventType,
+                                     OnosEventListener onosListener,
+                                     ListenerAction action) {
+        switch (eventType) {
+        case DEVICE:
+            if (action == ListenerAction.START) {
+                onosListener.startListener(DEVICE, deviceService);
+            } else {
+                onosListener.stopListener(DEVICE, deviceService);
+            }
+            break;
+        case LINK:
+            if (action == ListenerAction.START) {
+                onosListener.startListener(LINK, linkService);
+            } else {
+                onosListener.stopListener(LINK, linkService);
+            }
+            break;
+        default:
+            log.error("Cannot {} listener. Unsupported event type {} ",
+                      action.toString(), eventType.toString());
+        }
+    }
+
+    /**
+     * Returns the ONOS event listener corresponding to the ONOS Event type.
+     *
+     * @param eventType ONOS event type
+     * @return ONOS event listener
+     */
+    private OnosEventListener getListener(Type eventType) {
+        checkNotNull(eventType);
+        ListenerFactory factory = ListenerFactory.getInstance();
+        OnosEventListener onosListener = factory.getListener(eventType);
+        return onosListener;
+    }
+
+    /**
+     * Checks if the group id is valid for this registered application.
+     *
+     * @param groupId GroupId assigned to the subscriber
+     * @param appName Registered Application name
+     * @return true if valid groupId and false otherwise
+     */
+    private boolean validGroupId(EventSubscriberGroupId groupId,
+                                 String appName) {
+
+        checkNotNull(groupId);
+
+        ApplicationId appId = coreService.getAppId(appName);
+        EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
+        if (registeredGroupId.equals(groupId)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void unsubscribe(EventSubscriber subscriber)
+            throws InvalidGroupIdException, InvalidApplicationException {
+
+        checkNotNull(subscriber);
+
+        if (!registeredApplication(subscriber.appName())) {
+            throw new InvalidApplicationException("Application is not "
+                    + "registered to make this request.");
+        }
+
+        if (!validGroupId(subscriber.subscriberGroupId(),
+                          subscriber.appName())) {
+            throw new InvalidGroupIdException("Incorrect group id in the request");
+        }
+
+        if (!eventSubscribed(subscriber)) {
+            log.error("No subscription to {} was found",
+                      subscriber.eventType());
+            return;
+        }
+
+        // If this is the only subscriber listening for this event,
+        // stop the listener.
+        List<EventSubscriber> subscribers =
+                subscriptions.get(subscriber.eventType());
+        if (subscribers.size() == 1) {
+            OnosEventListener onosListener =
+                    getListener(subscriber.eventType());
+            checkNotNull(onosListener,
+                         "No listener for the supported event type - {}",
+                         subscriber.eventType());
+            applyListenerAction(subscriber.eventType(), onosListener,
+                                ListenerAction.STOP);
+        }
+
+        // update internal state.
+        subscribers.remove(subscriber);
+        subscriptions.put(subscriber.eventType(), subscribers);
+
+        log.info("Unsubscribed {} for {} events", subscriber.appName(),
+                 subscriber.eventType());
+    }
+
+    /**
+     * Checks if the subscriber has already subscribed to the requested event
+     * type.
+     *
+     * @param subscriber the subscriber to a specific ONOS event
+     * @return true if subscriber has subscribed to the ONOS event
+     */
+    private boolean eventSubscribed(EventSubscriber subscriber) {
+
+        List<EventSubscriber> subscriberList =
+                subscriptions.get(subscriber.eventType());
+
+        if (subscriberList == null) {
+            return false;
+        }
+
+        return subscriberList.contains(subscriber);
+    }
+
+}
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
similarity index 92%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/package-info.java
rename to apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
index 72f4843..a2950fa 100644
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/package-info.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
@@ -15,4 +15,4 @@
 /**
  * API implementation classes.
  */
-package org.onosproject.kafkaintegration;
+package org.onosproject.kafkaintegration.impl;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
new file mode 100644
index 0000000..22c4bf2
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+
+import org.onosproject.event.ListenerService;
+import org.onosproject.kafkaintegration.impl.Dispatcher;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.converter.ConversionFactory;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Listens for ONOS Device events.
+ *
+ */
+final class DeviceEventsListener implements OnosEventListener {
+
+    private boolean listenerRunning = false;
+
+    private InnerListener listener = null;
+
+    // Exists to defeat instantiation
+    private DeviceEventsListener() {
+    }
+
+    private static class SingletonHolder {
+        private static final DeviceEventsListener INSTANCE =
+                new DeviceEventsListener();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static DeviceEventsListener getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    @Override
+    public void startListener(Type e, ListenerService<?, ?> service) {
+        if (!listenerRunning) {
+            listener = new InnerListener();
+            DeviceService deviceService = (DeviceService) service;
+            deviceService.addListener(listener);
+            listenerRunning = true;
+        }
+    }
+
+    private class InnerListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent arg0) {
+
+            // Convert the event to GPB format
+            ConversionFactory conversionFactory =
+                    ConversionFactory.getInstance();
+            EventConverter converter = conversionFactory.getConverter(DEVICE);
+            GeneratedMessage message = converter.convertToProtoMessage(arg0);
+
+            // Call Dispatcher and publish event
+            Dispatcher.getInstance().publish(DEVICE, message);
+        }
+    }
+
+    @Override
+    public void stopListener(Type e, ListenerService<?, ?> service) {
+        if (listenerRunning) {
+            DeviceService deviceService = (DeviceService) service;
+            deviceService.removeListener(listener);
+            listener = null;
+            listenerRunning = false;
+        }
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
new file mode 100644
index 0000000..769d8b4
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+import org.onosproject.event.ListenerService;
+import org.onosproject.kafkaintegration.impl.Dispatcher;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.converter.ConversionFactory;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Listens for ONOS Link Events.
+ *
+ */
+final class LinkEventsListener implements OnosEventListener {
+
+    private boolean listenerRunning = false;
+
+    private InnerListener listener = null;
+
+    // Exists to defeat instantiation
+    private LinkEventsListener() {
+    }
+
+    private static class SingletonHolder {
+        private static final LinkEventsListener INSTANCE =
+                new LinkEventsListener();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static LinkEventsListener getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    @Override
+    public void startListener(Type e, ListenerService<?, ?> service) {
+        if (!listenerRunning) {
+            listener = new InnerListener();
+            LinkService linkService = (LinkService) service;
+            linkService.addListener(listener);
+            listenerRunning = true;
+        }
+    }
+
+    private class InnerListener implements LinkListener {
+
+        @Override
+        public void event(LinkEvent arg0) {
+
+            // Convert the event to GPB format
+            ConversionFactory conversionFactory =
+                    ConversionFactory.getInstance();
+            EventConverter converter = conversionFactory.getConverter(LINK);
+            GeneratedMessage message = converter.convertToProtoMessage(arg0);
+
+            // Call Dispatcher and publish event
+            Dispatcher.getInstance().publish(LINK, message);
+        }
+    }
+
+    @Override
+    public void stopListener(Type e, ListenerService<?, ?> service) {
+        if (listenerRunning) {
+            LinkService linkService = (LinkService) service;
+            linkService.removeListener(listener);
+            listenerRunning = false;
+        }
+    }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
new file mode 100644
index 0000000..496ba2b
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+/**
+ * Returns the appropriate listener object based on the ONOS event type.
+ *
+ */
+public final class ListenerFactory {
+
+    // Store listeners for all supported events
+    private Map<Type, OnosEventListener> listeners =
+            new HashMap<Type, OnosEventListener>() {
+                {
+                    put(DEVICE, DeviceEventsListener.getInstance());
+                    put(LINK, LinkEventsListener.getInstance());
+                }
+            };
+
+    // Exists to defeat instantiation
+    private ListenerFactory() {
+    }
+
+    private static class SingletonHolder {
+        private static final ListenerFactory INSTANCE = new ListenerFactory();
+    }
+
+    /**
+     * Returns a static reference to the Listener Factory.
+     *
+     * @return singleton object
+     */
+    public static ListenerFactory getInstance() {
+        return SingletonHolder.INSTANCE;
+    }
+
+    /**
+     * Returns the listener object for the specified ONOS event type.
+     *
+     * @param event ONOS Event type
+     * @return return listener object
+     */
+    public OnosEventListener getListener(Type event) {
+        return listeners.get(event);
+    }
+
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
new file mode 100644
index 0000000..22cde21
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright 2016 Open Networking Laboratory
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.listener;
+
+import org.onosproject.event.ListenerService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+/**
+ * APIs for starting and stopping a ONOS Event listener.
+ *
+ */
+public interface OnosEventListener {
+
+    /**
+     * Start the listener for the specific ONOS event type.
+     *
+     * @param event ONOS event type
+     * @param service ONOS event listener for the specific event type
+     */
+    void startListener(Type event, ListenerService<?, ?> service);
+
+    /**
+     * Stop the Listener for the specific ONOS event type.
+     *
+     * @param event ONOS event type
+     * @param service ONOS event listener for the specific event type
+     */
+    void stopListener(Type event, ListenerService<?, ?> service);
+
+}
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
similarity index 85%
copy from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
copy to apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
index 28427d4..f6f602f 100644
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/errors/package-info.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
@@ -13,6 +13,7 @@
  */
 
 /**
- * Application specific Exception classes.
+ * Listeners for listening to various ONOS events.
+ *
  */
-package org.onosproject.kafkaintegration.errors;
+package org.onosproject.kafkaintegration.listener;
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
similarity index 82%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
rename to apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
index 9677265..520976a 100644
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
@@ -14,15 +14,11 @@
  */
 package org.onosproject.kafkaintegration.rest;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.onosproject.codec.JsonCodec;
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.rest.AbstractWebResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+
+import java.io.IOException;
+import java.io.InputStream;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -31,11 +27,17 @@
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.io.InputStream;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.kafkaintegration.api.EventExporterService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.rest.AbstractWebResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * Rest Interfaces for subscribing/unsubscribing to event notifications.
@@ -44,14 +46,19 @@
 public class EventExporterWebResource extends AbstractWebResource {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    public static final String JSON_NOT_NULL = "Registration Data cannot be empty";
-    public static final String REGISTRATION_SUCCESSFUL = "Registered Listener successfully";
-    public static final String DEREGISTRATION_SUCCESSFUL = "De-Registered Listener successfully";
-    public static final String EVENT_SUBSCRIPTION_SUCCESSFUL = "Event Registration successfull";
-    public static final String EVENT_SUBSCRIPTION_REMOVED = "Event De-Registration successfull";
+    public static final String JSON_NOT_NULL =
+            "Registration Data cannot be empty";
+    public static final String REGISTRATION_SUCCESSFUL =
+            "Registered Listener successfully";
+    public static final String DEREGISTRATION_SUCCESSFUL =
+            "De-Registered Listener successfully";
+    public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
+            "Event Registration successfull";
+    public static final String EVENT_SUBSCRIPTION_REMOVED =
+            "Event De-Registration successfull";
 
     /**
-     * Registers a listener for Onos Events.
+     * Registers a listener for ONOS Events.
      *
      * @param appName The application trying to register
      * @return 200 OK with UUID string which should be used as Kafka Consumer
@@ -76,7 +83,7 @@
     }
 
     /**
-     * Unregisters a listener for Onos Events.
+     * Unregisters a listener for ONOS Events.
      *
      * @param appName The application trying to unregister
      * @return 200 OK
@@ -93,9 +100,9 @@
     }
 
     /**
-     * Creates subscription to a specific Onos event.
+     * Creates subscription to a specific ONOS event.
      *
-     * @param input Subscription Data in Json format
+     * @param input Subscription Data in JSON format
      * @return 200 OK if successful or 400 BAD REQUEST
      * @onos.rsModel KafkaSubscription
      */
@@ -118,9 +125,9 @@
     }
 
     /**
-     * Parses Json Subscription Data from the external application.
+     * Parses JSON Subscription Data from the external application.
      *
-     * @param node node within the parsed json tree.
+     * @param input Subscription Data in JSON format
      * @return parsed DTO object
      * @throws IOException
      */
@@ -137,9 +144,9 @@
     }
 
     /**
-     * Deletes subscription from a specific Onos event.
+     * Deletes subscription from a specific ONOS event.
      *
-     * @param input data in json format
+     * @param input data in JSON format
      * @return 200 OK if successful or 400 BAD REQUEST
      * @onos.rsModel KafkaSubscription
      */
@@ -152,7 +159,7 @@
 
         try {
             EventSubscriber sub = parseSubscriptionData(input);
-            service.subscribe(sub);
+            service.unsubscribe(sub);
         } catch (Exception e) {
             log.error(e.getMessage());
             return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
similarity index 100%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
rename to apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
similarity index 99%
rename from apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
rename to apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
index 424c4a5..7885851 100644
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
@@ -14,5 +14,6 @@
 
 /**
  * REST API Definitions.
+ *
  */
 package org.onosproject.kafkaintegration.rest;
diff --git a/apps/kafka-integration/src/main/resources/definitions/KafkaRegistration.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
similarity index 100%
rename from apps/kafka-integration/src/main/resources/definitions/KafkaRegistration.json
rename to apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
diff --git a/apps/kafka-integration/src/main/resources/definitions/KafkaSubscription.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
similarity index 100%
rename from apps/kafka-integration/src/main/resources/definitions/KafkaSubscription.json
rename to apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
diff --git a/apps/kafka-integration/src/main/webapp/WEB-INF/web.xml b/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
similarity index 100%
rename from apps/kafka-integration/src/main/webapp/WEB-INF/web.xml
rename to apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
diff --git a/apps/kafka-integration/pom.xml b/apps/kafka-integration/pom.xml
index 72f1c77..6f806a7 100644
--- a/apps/kafka-integration/pom.xml
+++ b/apps/kafka-integration/pom.xml
@@ -1,6 +1,6 @@
-<?sxml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2016 Open Networking Laboratory
+  ~ Copyright 2016-present Open Networking Laboratory
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -14,218 +14,26 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.onosproject</groupId>
         <artifactId>onos-apps</artifactId>
         <version>1.7.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
+    <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>onos-app-kafka</artifactId>
-    <packaging>bundle</packaging>
+    <artifactId>onos-kafka</artifactId>
 
-    <description>
-        ONOS Kafka Integration Application.
-        This will export ONOS events to an external Kafka Server
-    </description>
-    <url>http://onosproject.org</url>
+    <packaging>pom</packaging>
+    <description>Kafka Integration Application</description>
 
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <onos.version>1.6.0-SNAPSHOT</onos.version>
-        <onos.app.name>org.onosproject.kafkaintegration</onos.app.name>
-        <onos.app.title>Kafka Integration Application</onos.app.title>
-        <onos.app.origin>Calix, Inc.</onos.app.origin>
-        <web.context>/onos/kafka</web.context>
-        <api.version>1.0.0</api.version>
-        <api.package>org.onosproject.kafkaintegration.rest</api.package>
-        <api.title>Kafka Integration Application REST API</api.title>
-        <api.description>
-               APIs for subscribing to Events generated by onos
-        </api.description>
-        <onos.app.category>Utility</onos.app.category>
-        <onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
-        <onos.app.readme>Export onos events to a Northbound Kafka server</onos.app.readme>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onlab-osgi</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-rest</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-            <version>${onos.version}</version>
-            <scope>test</scope>
-            <classifier>tests</classifier>
-        </dependency>
-
-        <dependency>
-            <groupId>javax.ws.rs</groupId>
-            <artifactId>javax.ws.rs-api</artifactId>
-            <version>2.0.1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>2.5.0</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.codehaus.jackson</groupId>
-            <artifactId>jackson-core-asl</artifactId>
-            <version>1.9.13</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.codehaus.jackson</groupId>
-            <artifactId>jackson-mapper-asl</artifactId>
-            <version>1.9.13</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId>
-            <artifactId>jersey-container-servlet</artifactId>
-            <version>2.22.2</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-annotations</artifactId>
-            <version>2.6.4</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-core-serializers</artifactId>
-            <version>${onos.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.felix</groupId>
-            <artifactId>org.apache.felix.scr.annotations</artifactId>
-            <version>1.9.12</version>
-            <scope>provided</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <version>3.0.1</version>
-                <extensions>true</extensions>
-                <configuration>
-                    <instructions>
-                    <Bundle-SymbolicName>
-                        ${project.groupId}.${project.artifactId}
-                    </Bundle-SymbolicName>
-                    <_wab>src/main/webapp/</_wab>
-                    <Include-Resource>
-                        WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
-                                    {maven-resources}
-                    </Include-Resource>
-                    <Import-Package>
-                        org.slf4j,
-                        org.osgi.framework,
-                        javax.ws.rs,
-                        javax.ws.rs.core,
-                        org.glassfish.jersey.servlet,
-                        com.fasterxml.jackson.databind,
-                        com.fasterxml.jackson.databind.node,
-                        com.fasterxml.jackson.core,
-                        org.onlab.packet.*,
-                        org.onosproject.*,
-                        com.google.common.*
-                    </Import-Package>
-                    <Web-ContextPath>${web.context}</Web-ContextPath>
-                </instructions>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>2.5.1</version>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-scr-plugin</artifactId>
-                <version>1.21.0</version>
-                <executions>
-                    <execution>
-                        <id>generate-scr-srcdescriptor</id>
-                        <goals>
-                            <goal>scr</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <supportedProjectTypes>
-                        <supportedProjectType>bundle</supportedProjectType>
-                        <supportedProjectType>war</supportedProjectType>
-                    </supportedProjectTypes>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.onosproject</groupId>
-                <artifactId>onos-maven-plugin</artifactId>
-                <version>1.9</version>
-                <executions>
-                    <execution>
-                        <id>cfg</id>
-                        <phase>generate-resources</phase>
-                        <goals>
-                            <goal>cfg</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>swagger</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>swagger</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>app</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>app</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
+    <modules>
+        <module>api</module>
+        <module>app</module>
+    </modules>
 
 </project>
+
diff --git a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/EventExporterManager.java b/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/EventExporterManager.java
deleted file mode 100644
index 53d9173..0000000
--- a/apps/kafka-integration/src/main/java/org/onosproject/kafkaintegration/EventExporterManager.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
-import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
-import org.onosproject.kafkaintegration.errors.UnsupportedEventException;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.link.LinkService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of Event Exporter Service.
- *
- */
-@Component(immediate = true)
-@Service
-public class EventExporterManager implements EventExporterService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    // Stores the currently registered applications for event export service.
-    // Map of Appname to groupId
-    private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
-
-    private static final String REGISTERED_APPS = "registered-applications";
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LinkService linkService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CoreService coreService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
-
-    private static final String NOT_YET_SUPPORTED = "Not yet supported.";
-
-    private ApplicationId appId;
-
-    @Activate
-    protected void activate() {
-        appId = coreService
-                .registerApplication("org.onosproject.kafkaintegration");
-
-        registeredApps = storageService
-                .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
-                .withName(REGISTERED_APPS)
-                .withSerializer(Serializer.using(KryoNamespaces.API,
-                                                 EventSubscriberGroupId.class,
-                                                 UUID.class))
-                .build().asJavaMap();
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        log.info("Stopped");
-    }
-
-    @Override
-    public EventSubscriberGroupId registerListener(String appName) {
-
-        // TODO: Remove it once ONOS provides a mechanism for external apps
-        // to register with the core service. See Jira - 4409
-        ApplicationId externalAppId = coreService.registerApplication(appName);
-
-        return registeredApps.computeIfAbsent(externalAppId,
-                                              (key) -> new EventSubscriberGroupId(UUID
-                                                      .randomUUID()));
-
-    }
-
-    @Override
-    public void unregisterListener(String appName) {
-        ApplicationId externalAppId =
-                checkNotNull(coreService.getAppId(appName));
-        registeredApps.remove(externalAppId);
-    }
-
-    @Override
-    public void subscribe(EventSubscriber subscriber)
-            throws UnsupportedEventException, InvalidGroupIdException,
-            InvalidApplicationException {
-
-        throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
-    }
-
-    @Override
-    public void unsubscribe(EventSubscriber subscriber)
-            throws InvalidGroupIdException, InvalidApplicationException {
-
-        throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
-    }
-}
diff --git a/incubator/protobuf/src/main/proto/Device.proto b/incubator/protobuf/src/main/proto/Device.proto
index ec138e0..a01790e 100644
--- a/incubator/protobuf/src/main/proto/Device.proto
+++ b/incubator/protobuf/src/main/proto/Device.proto
@@ -3,6 +3,23 @@
 
 package Device;
 
+message DeviceDescription {
+  string device_Uri = 1;
+  DeviceType type = 2;
+  string manufacturer = 3;
+  string hw_version = 4;
+  string sw_version = 5;
+  string serial_number = 6;
+  string chassis_id = 7;
+  map<string, string> annotations = 8;
+}
+
+enum MastershipRole {
+  NONE = 0;
+  MASTER = 1;
+  STANDBY = 2;
+}
+
 enum DeviceType {
   OTHER = 0;
   SWITCH = 1;
@@ -20,8 +37,9 @@
   MICROWAVE = 13;
 }
 
-message DeviceDescription {
-  string device_Uri = 1;
+// Corresponds to org.onosproject.net.Device.
+message DeviceCore {
+  string deviceId = 1;
   DeviceType type = 2;
   string manufacturer = 3;
   string hw_version = 4;
@@ -29,10 +47,4 @@
   string serial_number = 6;
   string chassis_id = 7;
   map<string, string> annotations = 8;
-}
-
-enum MastershipRole {
-  NONE = 0;
-  MASTER = 1;
-  STANDBY = 2;
-}
+} 
\ No newline at end of file
diff --git a/incubator/protobuf/src/main/proto/DeviceEvent.proto b/incubator/protobuf/src/main/proto/DeviceEvent.proto
new file mode 100644
index 0000000..9bf7a2f
--- /dev/null
+++ b/incubator/protobuf/src/main/proto/DeviceEvent.proto
@@ -0,0 +1,30 @@
+syntax = "proto3";
+option java_package = "org.onosproject.grpc.net";
+
+
+import "Device.proto";
+import "Port.proto";
+
+package DeviceEvent;
+
+// Corresponds to org.onosproject.net.device.DeviceEvent.
+message DeviceNotification {
+    Device.DeviceCore device = 1;
+    DeviceEventType deviceEventType = 2;
+    Port.PortCore port = 3;
+}
+
+enum DeviceEventType {
+    DEVICE_ADDED = 0;
+    DEVICE_UPDATED = 1;
+    DEVICE_REMOVED = 2;
+    DEVICE_SUSPENDED = 3;
+    DEVICE_AVAILABILITY_CHANGED = 4;
+    PORT_ADDED = 5;
+    PORT_UPDATED = 6;
+    PORT_REMOVED = 7;
+    PORT_STATS_UPDATED = 8;
+}
+
+
+
diff --git a/incubator/protobuf/src/main/proto/Link.proto b/incubator/protobuf/src/main/proto/Link.proto
index 45b9e90..b5fcbfc 100644
--- a/incubator/protobuf/src/main/proto/Link.proto
+++ b/incubator/protobuf/src/main/proto/Link.proto
@@ -38,6 +38,20 @@
   string port_number = 2;
 }
 
+enum LinkState {
+    ACTIVE = 0;
+    INACTIVE = 1;
+}
+
+// Corresponds to org.onosproject.net.Link.
+message LinkCore {
+  LinkState state = 1;
+  ConnectPoint src = 2;
+  ConnectPoint dst = 3;
+  LinkType type = 4;
+  map<string, string> annotations = 5;
+}
+
 message LinkDescription {
   ConnectPoint src = 1;
   ConnectPoint dst = 2;
diff --git a/incubator/protobuf/src/main/proto/LinkEvent.proto b/incubator/protobuf/src/main/proto/LinkEvent.proto
new file mode 100644
index 0000000..fdd6580
--- /dev/null
+++ b/incubator/protobuf/src/main/proto/LinkEvent.proto
@@ -0,0 +1,19 @@
+syntax = "proto3";
+option java_package = "org.onosproject.grpc.net";
+
+package LinkEvent;
+
+import "Link.proto";
+
+// Corresponds to org.onosproject.net.link.LinkEvent.
+message LinkNotification {
+    LinkEventType linkEventType = 2;
+    Link.LinkCore link = 3;
+}
+
+// Link Event Types
+enum LinkEventType {
+    LINK_ADDED = 0;
+    LINK_UPDATED = 1;
+    LINK_REMOVED = 2;
+}
diff --git a/incubator/protobuf/src/main/proto/Port.proto b/incubator/protobuf/src/main/proto/Port.proto
index 0d9ed2e..a3fd34d 100644
--- a/incubator/protobuf/src/main/proto/Port.proto
+++ b/incubator/protobuf/src/main/proto/Port.proto
@@ -31,6 +31,15 @@
   map<string, string> annotations = 8;
 }
 
+// Corresponds to org.onosproject.net.Port.
+message PortCore {
+  string port_number = 1;
+  bool is_enabled = 2;
+  PortType type = 3;
+  int64 port_speed = 4;
+  map<string, string> annotations = 5;
+}
+
 message PortStatistics {
   int32 port = 1;
   int64 packets_received = 2;