Made changes as per comments.
kafkaProducer is now non-static.

TODO: KafkaPublisherManager Service and not Singleton.
Kafka event publishing.

Change-Id: I5ec20a6e4950c38e822468d343521ab77475b7d3
diff --git a/apps/kafka-integration/app/app.xml b/apps/kafka-integration/app/app.xml
index 5912d4b..a832936 100644
--- a/apps/kafka-integration/app/app.xml
+++ b/apps/kafka-integration/app/app.xml
@@ -19,6 +19,9 @@
      featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
      features="${project.artifactId}" apps="org.onosproject.incubator.protobuf">
     <description>${project.description}</description>
-    <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+
     <artifact>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</artifact>
+    <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</artifact>
 </app>
diff --git a/apps/kafka-integration/app/features.xml b/apps/kafka-integration/app/features.xml
index 1e219b0..2982029 100644
--- a/apps/kafka-integration/app/features.xml
+++ b/apps/kafka-integration/app/features.xml
@@ -19,6 +19,8 @@
              description="${project.description}">
         <feature>onos-api</feature>
         <bundle>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</bundle>
-        <bundle>mvn:${project.groupId}/onos-app-kafka/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</bundle>
+        <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</bundle>
     </feature>
 </features>
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
index 1dafb0d..5a6e574 100644
--- a/apps/kafka-integration/app/pom.xml
+++ b/apps/kafka-integration/app/pom.xml
@@ -26,12 +26,6 @@
 
     <artifactId>onos-app-kafka</artifactId>
 
-    <packaging>bundle</packaging>
-    <description>
-          Kafka Integration Application.
-          This will export ONOS Events to Northbound Kafka Server.
-    </description>
-
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <onos.version>${project.version}</onos.version>
@@ -41,21 +35,18 @@
         <web.context>/onos/kafka</web.context>
         <api.version>1.0.0</api.version>
         <api.package>org.onosproject.kafkaintegration.rest</api.package>
-        <api.title>Kafka Integration Application REST API</api.title>
-        <api.description>
-               APIs for subscribing to Events generated by ONOS
-        </api.description>
         <onos.app.category>Utility</onos.app.category>
         <onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
-        <onos.app.readme>Export ONOS events to a Northbound Kafka server</onos.app.readme>
         <onos.app.requires>org.onosproject.incubator.protobuf</onos.app.requires>
     </properties>
 
-       <dependencies>
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-        </dependency>
+    <packaging>pom</packaging>
+    <description>
+        Kafka Integration Application.
+        This will export ONOS Events to Northbound Kafka Server.
+    </description>
+
+    <dependencies>
 
         <dependency>
             <groupId>org.onosproject</groupId>
@@ -65,168 +56,22 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onos-incubator-protobuf</artifactId>
+            <artifactId>onos-app-kafka-core</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onlab-osgi</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-rest</artifactId>
+            <artifactId>onos-app-kafka-web</artifactId>
             <version>${project.version}</version>
         </dependency>
 
+        <!--Also need to update the app.xml and the features.xml -->
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+            <version>0.8.2.2_1</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-            <scope>test</scope>
-            <classifier>tests</classifier>
-        </dependency>
-
-        <dependency>
-            <groupId>javax.ws.rs</groupId>
-            <artifactId>javax.ws.rs-api</artifactId>
-            <version>2.0.1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>3.0.0-beta-2</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.codehaus.jackson</groupId>
-            <artifactId>jackson-core-asl</artifactId>
-            <version>1.9.13</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.codehaus.jackson</groupId>
-            <artifactId>jackson-mapper-asl</artifactId>
-            <version>1.9.13</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId>
-            <artifactId>jersey-container-servlet</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-annotations</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-core-serializers</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.felix</groupId>
-            <artifactId>org.apache.felix.scr.annotations</artifactId>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <extensions>true</extensions>
-                <configuration>
-                    <instructions>
-                    <Bundle-SymbolicName>
-                        ${project.groupId}.${project.artifactId}
-                    </Bundle-SymbolicName>
-                    <_wab>src/main/webapp/</_wab>
-                    <Include-Resource>
-                        WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
-                                    {maven-resources}
-                    </Include-Resource>
-                    <Import-Package>
-                        org.slf4j,
-                        org.osgi.framework,
-                        javax.ws.rs,
-                        javax.ws.rs.core,
-                        org.glassfish.jersey.servlet,
-                        com.fasterxml.jackson.databind,
-                        com.fasterxml.jackson.databind.node,
-                        com.fasterxml.jackson.core,
-                        org.onlab.packet.*,
-                        org.onosproject.*,
-                        org.onlab.util.*,
-                        com.google.common.*,
-                        com.google.protobuf.*
-                    </Import-Package>
-                    <Web-ContextPath>${web.context}</Web-ContextPath>
-                </instructions>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-scr-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>generate-scr-srcdescriptor</id>
-                        <goals>
-                            <goal>scr</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <supportedProjectTypes>
-                        <supportedProjectType>bundle</supportedProjectType>
-                        <supportedProjectType>war</supportedProjectType>
-                    </supportedProjectTypes>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.onosproject</groupId>
-                <artifactId>onos-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>cfg</id>
-                        <phase>generate-resources</phase>
-                        <goals>
-                            <goal>cfg</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>swagger</id>
-                        <goals>
-                            <goal>swagger</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>app</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>app</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
deleted file mode 100644
index 8f69a20..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;;
-
-/**
- * Returns the appropriate converter object based on the ONOS event type.
- *
- */
-public final class ConversionFactory {
-
-    // Store converters for all supported events
-    private Map<Type, EventConverter> converters =
-            new HashMap<Type, EventConverter>() {
-                {
-                    put(DEVICE, new DeviceEventConverter());
-                    put(LINK, new LinkEventConverter());
-                }
-            };
-
-    // Exists to defeat instantiation
-    private ConversionFactory() {
-    }
-
-    private static class SingletonHolder {
-        private static final ConversionFactory INSTANCE =
-                new ConversionFactory();
-    }
-
-    /**
-     * Returns a static reference to the Conversion Factory.
-     *
-     * @return singleton object
-     */
-    public static ConversionFactory getInstance() {
-        return SingletonHolder.INSTANCE;
-    }
-
-    /**
-     * Returns an Event converter object for the specified ONOS event type.
-     *
-     * @param event ONOS event type
-     * @return Event Converter object
-     */
-    public EventConverter getConverter(Type event) {
-        return converters.get(event);
-    }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
deleted file mode 100644
index ac04040..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import org.onosproject.event.Event;
-import org.onosproject.grpc.net.Device.DeviceCore;
-import org.onosproject.grpc.net.Device.DeviceType;
-import org.onosproject.grpc.net.DeviceEvent.DeviceEventType;
-import org.onosproject.grpc.net.DeviceEvent.DeviceNotification;
-import org.onosproject.grpc.net.Port.PortCore;
-import org.onosproject.grpc.net.Port.PortType;
-import org.onosproject.net.device.DeviceEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Converts ONOS Device event message to GPB format.
- *
- */
-class DeviceEventConverter implements EventConverter {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
-
-        DeviceEvent deviceEvent = (DeviceEvent) event;
-
-        if (!deviceEventTypeSupported(deviceEvent)) {
-            log.error("Unsupported Onos Device Event {}. There is no matching"
-                    + "proto Device Event type", deviceEvent.type().toString());
-            return null;
-        }
-
-        return buildDeviceProtoMessage(deviceEvent);
-    }
-
-    /**
-     * Checks if the ONOS Device Event type is supported.
-     *
-     * @param event ONOS Device event
-     * @return true if there is a match and false otherwise
-     */
-    private boolean deviceEventTypeSupported(DeviceEvent event) {
-        DeviceEventType[] deviceEvents = DeviceEventType.values();
-        for (DeviceEventType deviceEventType : deviceEvents) {
-            if (deviceEventType.name().equals(event.type().name())) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    private DeviceNotification buildDeviceProtoMessage(DeviceEvent deviceEvent) {
-        DeviceNotification.Builder notificationBuilder =
-                DeviceNotification.newBuilder();
-
-        DeviceCore deviceCore =
-                DeviceCore.newBuilder()
-                        .setChassisId(deviceEvent.subject().chassisId().id()
-                                .toString())
-                        .setDeviceId(deviceEvent.subject().id().toString())
-                        .setHwVersion(deviceEvent.subject().hwVersion())
-                        .setManufacturer(deviceEvent.subject().manufacturer())
-                        .setSerialNumber(deviceEvent.subject().serialNumber())
-                        .setSwVersion(deviceEvent.subject().swVersion())
-                        .setType(DeviceType
-                                .valueOf(deviceEvent.subject().type().name()))
-                        .build();
-
-        PortCore portCore = null;
-        if (deviceEvent.port() != null) {
-            portCore =
-                    PortCore.newBuilder()
-                            .setIsEnabled(deviceEvent.port().isEnabled())
-                            .setPortNumber(deviceEvent.port().number()
-                                    .toString())
-                            .setPortSpeed(deviceEvent.port().portSpeed())
-                            .setType(PortType
-                                    .valueOf(deviceEvent.port().type().name()))
-                            .build();
-
-            notificationBuilder.setPort(portCore);
-        }
-
-        notificationBuilder.setDeviceEventType(getProtoType(deviceEvent))
-                .setDevice(deviceCore);
-
-        return notificationBuilder.build();
-    }
-
-    /**
-     * Retrieves the protobuf generated device event type.
-     *
-     * @param event ONOS Device Event
-     * @return generated Device Event Type
-     */
-    private DeviceEventType getProtoType(DeviceEvent event) {
-        DeviceEventType protobufEventType = null;
-        DeviceEventType[] deviceEvents = DeviceEventType.values();
-        for (DeviceEventType deviceEventType : deviceEvents) {
-            if (deviceEventType.name().equals(event.type().name())) {
-                protobufEventType = deviceEventType;
-            }
-        }
-
-        return protobufEventType;
-    }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
deleted file mode 100644
index e15ebdc..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import org.onosproject.event.Event;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- *
- * APIs for converting between ONOS event objects and GPB data objects.
- *
- */
-public interface EventConverter {
-
-    /**
-     * Converts ONOS specific event data to a format that is suitable for export
-     * to Kafka.
-     *
-     * @param event ONOS Event object
-     * @return converted data in GPB format.
-     */
-    public GeneratedMessage convertToProtoMessage(Event<?, ?> event);
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
deleted file mode 100644
index a1db209..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import org.onosproject.event.Event;
-import org.onosproject.grpc.net.Link.ConnectPoint;
-import org.onosproject.grpc.net.Link.LinkCore;
-import org.onosproject.grpc.net.Link.LinkState;
-import org.onosproject.grpc.net.Link.LinkType;
-import org.onosproject.grpc.net.LinkEvent.LinkEventType;
-import org.onosproject.grpc.net.LinkEvent.LinkNotification;
-import org.onosproject.net.link.LinkEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Converts for ONOS Link event message to GPB format.
- *
- */
-class LinkEventConverter implements EventConverter {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
-
-        LinkEvent linkEvent = (LinkEvent) event;
-
-        if (!linkEventTypeSupported(linkEvent)) {
-            log.error("Unsupported Onos Event {}. There is no matching"
-                    + "proto Event type", linkEvent.type().toString());
-            return null;
-        }
-
-        return buildDeviceProtoMessage(linkEvent);
-    }
-
-    private boolean linkEventTypeSupported(LinkEvent event) {
-        LinkEventType[] kafkaLinkEvents = LinkEventType.values();
-        for (LinkEventType linkEventType : kafkaLinkEvents) {
-            if (linkEventType.name().equals(event.type().name())) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    private LinkNotification buildDeviceProtoMessage(LinkEvent linkEvent) {
-        LinkNotification notification = LinkNotification.newBuilder()
-                .setLinkEventType(getProtoType(linkEvent))
-                .setLink(LinkCore.newBuilder()
-                        .setState(LinkState
-                                .valueOf(linkEvent.subject().state().name()))
-                        .setType(LinkType
-                                .valueOf(linkEvent.subject().type().name()))
-                        .setDst(ConnectPoint.newBuilder()
-                                .setDeviceId(linkEvent.subject().dst()
-                                        .deviceId().toString())
-                                .setPortNumber(linkEvent.subject().dst().port()
-                                        .toString()))
-                        .setSrc(ConnectPoint.newBuilder()
-                                .setDeviceId(linkEvent.subject().src()
-                                        .deviceId().toString())
-                                .setPortNumber(linkEvent.subject().src().port()
-                                        .toString())))
-                .build();
-
-        return notification;
-    }
-
-    /**
-     * Returns the specific Kafka Device Event Type for the corresponding ONOS
-     * Device Event Type.
-     *
-     * @param event ONOS Device Event
-     * @return Kafka Device Event Type
-     */
-    private LinkEventType getProtoType(LinkEvent event) {
-        LinkEventType generatedEventType = null;
-        LinkEventType[] kafkaEvents = LinkEventType.values();
-        for (LinkEventType linkEventType : kafkaEvents) {
-            if (linkEventType.name().equals(event.type().name())) {
-                generatedEventType = linkEventType;
-            }
-        }
-
-        return generatedEventType;
-    }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
deleted file mode 100644
index d33c0d7..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Converters for converting various ONOS events to their corresponding Protocol
- * Buffer format.
- *
- */
-package org.onosproject.kafkaintegration.converter;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
deleted file mode 100644
index d211458..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.kafkaintegration.api.ExportableEventListener;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Dispatch ONOS Events to all interested Listeners.
- *
- */
-public final class Dispatcher
-        extends AbstractListenerManager<OnosEvent, ExportableEventListener> {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    // Exists to defeat instantiation
-    private Dispatcher() {
-    }
-
-    private static class SingletonHolder {
-        private static final Dispatcher INSTANCE = new Dispatcher();
-    }
-
-    /**
-     * Returns a static reference to the Listener Factory.
-     *
-     * @return singleton object
-     */
-    public static Dispatcher getInstance() {
-        return SingletonHolder.INSTANCE;
-    }
-
-    /**
-     * Publish the ONOS Event to all listeners.
-     *
-     * @param eventType the ONOS eventtype
-     * @param message generated Protocol buffer message from ONOS event data
-     */
-    public void publish(Type eventType, GeneratedMessage message) {
-        log.debug("Dispatching ONOS Event {}", eventType);
-        post(new OnosEvent(eventType, message));
-    }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
deleted file mode 100644
index e2e8e57..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
-import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
-import org.onosproject.kafkaintegration.listener.ListenerFactory;
-import org.onosproject.kafkaintegration.listener.OnosEventListener;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.link.LinkService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of Event Exporter Service.
- *
- */
-@Component(immediate = true)
-@Service
-public class EventExporterManager implements EventExporterService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    // Stores the currently registered applications for event export service.
-    private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
-
-    private Map<Type, List<EventSubscriber>> subscriptions;
-
-    private static final String REGISTERED_APPS = "registered-applications";
-
-    private static final String SUBSCRIBED_APPS = "event-subscriptions";
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceService deviceService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LinkService linkService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CoreService coreService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
-
-    private ApplicationId appId;
-
-    @Activate
-    protected void activate() {
-        appId = coreService
-                .registerApplication("org.onosproject.kafkaintegration");
-
-        registeredApps = storageService
-                .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
-                .withName(REGISTERED_APPS)
-                .withSerializer(Serializer.using(KryoNamespaces.API,
-                                                 EventSubscriberGroupId.class,
-                                                 UUID.class))
-                .build().asJavaMap();
-
-        subscriptions = storageService
-                .<Type, List<EventSubscriber>>consistentMapBuilder()
-                .withName(SUBSCRIBED_APPS)
-                .withSerializer(Serializer.using(KryoNamespaces.API,
-                                                 EventSubscriber.class,
-                                                 OnosEvent.class,
-                                                 OnosEvent.Type.class,
-                                                 DefaultEventSubscriber.class,
-                                                 EventSubscriberGroupId.class,
-                                                 UUID.class))
-                .build().asJavaMap();
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        log.info("Stopped");
-    }
-
-    @Override
-    public EventSubscriberGroupId registerListener(String appName) {
-
-        // TODO: Remove it once ONOS provides a mechanism for external apps
-        // to register with the core service. See Jira - 4409
-        ApplicationId externalAppId = coreService.registerApplication(appName);
-
-        return registeredApps.computeIfAbsent(externalAppId,
-                                              (key) -> new EventSubscriberGroupId(UUID
-                                                      .randomUUID()));
-    }
-
-    @Override
-    public void unregisterListener(String appName) {
-        ApplicationId externalAppId =
-                checkNotNull(coreService.getAppId(appName));
-        registeredApps.remove(externalAppId);
-    }
-
-    @Override
-    public void subscribe(EventSubscriber subscriber)
-            throws InvalidGroupIdException, InvalidApplicationException {
-
-        checkNotNull(subscriber);
-
-        if (!registeredApplication(subscriber.appName())) {
-            throw new InvalidApplicationException("Application is not "
-                    + "registered to make this request.");
-        }
-
-        if (!validGroupId(subscriber.subscriberGroupId(),
-                          subscriber.appName())) {
-            throw new InvalidGroupIdException("Incorrect group id in the request");
-        }
-
-        OnosEventListener onosListener = getListener(subscriber.eventType());
-        checkNotNull(onosListener,
-                     "No listener for the supported event type - {}",
-                     subscriber.eventType());
-
-        applyListenerAction(subscriber.eventType(), onosListener,
-                            ListenerAction.START);
-
-        // update internal state
-        List<EventSubscriber> subscriptionList =
-                subscriptions.get(subscriber.eventType());
-        if (subscriptionList == null) {
-            subscriptionList = new ArrayList<EventSubscriber>();
-        }
-        subscriptionList.add(subscriber);
-        subscriptions.put(subscriber.eventType(), subscriptionList);
-
-        log.info("Subscription for {} event by {} successfull",
-                 subscriber.eventType(), subscriber.appName());
-    }
-
-    /**
-     * Checks if the application has registered.
-     *
-     * @param appName application name
-     * @return true if application has registered
-     */
-    private boolean registeredApplication(String appName) {
-
-        checkNotNull(appName);
-        ApplicationId appId = checkNotNull(coreService.getAppId(appName));
-        if (registeredApps.containsKey(appId)) {
-            return true;
-        }
-
-        log.debug("{} is not registered", appName);
-        return false;
-    }
-
-    /**
-     * Actions that can be performed on the ONOS Event Listeners.
-     *
-     */
-    private enum ListenerAction {
-        START, STOP;
-    }
-
-    /**
-     * Applies the specified action on the Listener.
-     *
-     * @param eventType the ONOS Event type registered by the application
-     * @param onosListener ONOS event listener
-     * @param action to be performed on the listener
-     */
-    private void applyListenerAction(Type eventType,
-                                     OnosEventListener onosListener,
-                                     ListenerAction action) {
-        switch (eventType) {
-        case DEVICE:
-            if (action == ListenerAction.START) {
-                onosListener.startListener(DEVICE, deviceService);
-            } else {
-                onosListener.stopListener(DEVICE, deviceService);
-            }
-            break;
-        case LINK:
-            if (action == ListenerAction.START) {
-                onosListener.startListener(LINK, linkService);
-            } else {
-                onosListener.stopListener(LINK, linkService);
-            }
-            break;
-        default:
-            log.error("Cannot {} listener. Unsupported event type {} ",
-                      action.toString(), eventType.toString());
-        }
-    }
-
-    /**
-     * Returns the ONOS event listener corresponding to the ONOS Event type.
-     *
-     * @param eventType ONOS event type
-     * @return ONOS event listener
-     */
-    private OnosEventListener getListener(Type eventType) {
-        checkNotNull(eventType);
-        ListenerFactory factory = ListenerFactory.getInstance();
-        OnosEventListener onosListener = factory.getListener(eventType);
-        return onosListener;
-    }
-
-    /**
-     * Checks if the group id is valid for this registered application.
-     *
-     * @param groupId GroupId assigned to the subscriber
-     * @param appName Registered Application name
-     * @return true if valid groupId and false otherwise
-     */
-    private boolean validGroupId(EventSubscriberGroupId groupId,
-                                 String appName) {
-
-        checkNotNull(groupId);
-
-        ApplicationId appId = coreService.getAppId(appName);
-        EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
-        if (registeredGroupId.equals(groupId)) {
-            return true;
-        }
-
-        return false;
-    }
-
-    @Override
-    public void unsubscribe(EventSubscriber subscriber)
-            throws InvalidGroupIdException, InvalidApplicationException {
-
-        checkNotNull(subscriber);
-
-        if (!registeredApplication(subscriber.appName())) {
-            throw new InvalidApplicationException("Application is not "
-                    + "registered to make this request.");
-        }
-
-        if (!validGroupId(subscriber.subscriberGroupId(),
-                          subscriber.appName())) {
-            throw new InvalidGroupIdException("Incorrect group id in the request");
-        }
-
-        if (!eventSubscribed(subscriber)) {
-            log.error("No subscription to {} was found",
-                      subscriber.eventType());
-            return;
-        }
-
-        // If this is the only subscriber listening for this event,
-        // stop the listener.
-        List<EventSubscriber> subscribers =
-                subscriptions.get(subscriber.eventType());
-        if (subscribers.size() == 1) {
-            OnosEventListener onosListener =
-                    getListener(subscriber.eventType());
-            checkNotNull(onosListener,
-                         "No listener for the supported event type - {}",
-                         subscriber.eventType());
-            applyListenerAction(subscriber.eventType(), onosListener,
-                                ListenerAction.STOP);
-        }
-
-        // update internal state.
-        subscribers.remove(subscriber);
-        subscriptions.put(subscriber.eventType(), subscribers);
-
-        log.info("Unsubscribed {} for {} events", subscriber.appName(),
-                 subscriber.eventType());
-    }
-
-    /**
-     * Checks if the subscriber has already subscribed to the requested event
-     * type.
-     *
-     * @param subscriber the subscriber to a specific ONOS event
-     * @return true if subscriber has subscribed to the ONOS event
-     */
-    private boolean eventSubscribed(EventSubscriber subscriber) {
-
-        List<EventSubscriber> subscriberList =
-                subscriptions.get(subscriber.eventType());
-
-        if (subscriberList == null) {
-            return false;
-        }
-
-        return subscriberList.contains(subscriber);
-    }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
deleted file mode 100644
index 47098fa..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.codec.CodecService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.rest.SubscriberCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of the JSON codec brokering service for Kafka app.
- */
-@Component(immediate = true)
-public class KafkaCodecRegistrator {
-    private static Logger log = LoggerFactory.getLogger(KafkaCodecRegistrator
-                                                                .class);
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CodecService codecService;
-
-    @Activate
-    public void activate() {
-        codecService.registerCodec(EventSubscriber.class, new SubscriberCodec());
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
deleted file mode 100644
index 4b9035e..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.store.service.AtomicValue;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(immediate = true)
-public class KafkaStorageManager implements KafkaEventStorageService {
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
-
-    private TreeMap<Long, OnosEvent> kafkaEventStore;
-
-    private AtomicValue<Long> lastPublishedEvent;
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private ScheduledExecutorService gcExService;
-
-    private InternalGarbageCollector gcTask;
-
-    // Thread scheduler parameters.
-    private final long delay = 0;
-    private final long period = 1;
-
-    @Activate
-    protected void activate() {
-        kafkaEventStore = new TreeMap<Long, OnosEvent>();
-        lastPublishedEvent = storageService.<Long>atomicValueBuilder()
-                .withName("onos-app-kafka-published-seqNumber").build()
-                .asAtomicValue();
-
-        startGC();
-
-        log.info("Started");
-    }
-
-    private void startGC() {
-        log.info("Starting Garbage Collection Service");
-        gcExService = Executors.newSingleThreadScheduledExecutor();
-        gcTask = new InternalGarbageCollector();
-        gcExService.scheduleAtFixedRate(gcTask, delay, period,
-                                        TimeUnit.SECONDS);
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        stopGC();
-        log.info("Stopped");
-    }
-
-    private void stopGC() {
-        log.info("Stopping Garbage Collection Service");
-        gcExService.shutdown();
-    }
-
-    @Override
-    public boolean insertCacheEntry(OnosEvent e) {
-        // TODO: Fill in the code once the event carries timestamp info.
-        return true;
-    }
-
-    @Override
-    public void updateLastPublishedEntry(Long sequenceNumber) {
-        this.lastPublishedEvent.set(sequenceNumber);
-    }
-
-    /**
-     * Removes events from the Kafka Event Store which have been published.
-     *
-     */
-    private class InternalGarbageCollector implements Runnable {
-
-        @Override
-        public void run() {
-            kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
-        }
-    }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
deleted file mode 100644
index a2950fa..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * API implementation classes.
- */
-package org.onosproject.kafkaintegration.impl;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
deleted file mode 100644
index 22c4bf2..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-
-import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.onosproject.kafkaintegration.converter.ConversionFactory;
-import org.onosproject.kafkaintegration.converter.EventConverter;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
-import org.onosproject.net.device.DeviceService;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Listens for ONOS Device events.
- *
- */
-final class DeviceEventsListener implements OnosEventListener {
-
-    private boolean listenerRunning = false;
-
-    private InnerListener listener = null;
-
-    // Exists to defeat instantiation
-    private DeviceEventsListener() {
-    }
-
-    private static class SingletonHolder {
-        private static final DeviceEventsListener INSTANCE =
-                new DeviceEventsListener();
-    }
-
-    /**
-     * Returns a static reference to the Listener Factory.
-     *
-     * @return singleton object
-     */
-    public static DeviceEventsListener getInstance() {
-        return SingletonHolder.INSTANCE;
-    }
-
-    @Override
-    public void startListener(Type e, ListenerService<?, ?> service) {
-        if (!listenerRunning) {
-            listener = new InnerListener();
-            DeviceService deviceService = (DeviceService) service;
-            deviceService.addListener(listener);
-            listenerRunning = true;
-        }
-    }
-
-    private class InnerListener implements DeviceListener {
-
-        @Override
-        public void event(DeviceEvent arg0) {
-
-            // Convert the event to GPB format
-            ConversionFactory conversionFactory =
-                    ConversionFactory.getInstance();
-            EventConverter converter = conversionFactory.getConverter(DEVICE);
-            GeneratedMessage message = converter.convertToProtoMessage(arg0);
-
-            // Call Dispatcher and publish event
-            Dispatcher.getInstance().publish(DEVICE, message);
-        }
-    }
-
-    @Override
-    public void stopListener(Type e, ListenerService<?, ?> service) {
-        if (listenerRunning) {
-            DeviceService deviceService = (DeviceService) service;
-            deviceService.removeListener(listener);
-            listener = null;
-            listenerRunning = false;
-        }
-    }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
deleted file mode 100644
index 769d8b4..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.onosproject.kafkaintegration.converter.ConversionFactory;
-import org.onosproject.kafkaintegration.converter.EventConverter;
-import org.onosproject.net.link.LinkEvent;
-import org.onosproject.net.link.LinkListener;
-import org.onosproject.net.link.LinkService;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Listens for ONOS Link Events.
- *
- */
-final class LinkEventsListener implements OnosEventListener {
-
-    private boolean listenerRunning = false;
-
-    private InnerListener listener = null;
-
-    // Exists to defeat instantiation
-    private LinkEventsListener() {
-    }
-
-    private static class SingletonHolder {
-        private static final LinkEventsListener INSTANCE =
-                new LinkEventsListener();
-    }
-
-    /**
-     * Returns a static reference to the Listener Factory.
-     *
-     * @return singleton object
-     */
-    public static LinkEventsListener getInstance() {
-        return SingletonHolder.INSTANCE;
-    }
-
-    @Override
-    public void startListener(Type e, ListenerService<?, ?> service) {
-        if (!listenerRunning) {
-            listener = new InnerListener();
-            LinkService linkService = (LinkService) service;
-            linkService.addListener(listener);
-            listenerRunning = true;
-        }
-    }
-
-    private class InnerListener implements LinkListener {
-
-        @Override
-        public void event(LinkEvent arg0) {
-
-            // Convert the event to GPB format
-            ConversionFactory conversionFactory =
-                    ConversionFactory.getInstance();
-            EventConverter converter = conversionFactory.getConverter(LINK);
-            GeneratedMessage message = converter.convertToProtoMessage(arg0);
-
-            // Call Dispatcher and publish event
-            Dispatcher.getInstance().publish(LINK, message);
-        }
-    }
-
-    @Override
-    public void stopListener(Type e, ListenerService<?, ?> service) {
-        if (listenerRunning) {
-            LinkService linkService = (LinkService) service;
-            linkService.removeListener(listener);
-            listenerRunning = false;
-        }
-    }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
deleted file mode 100644
index 496ba2b..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-
-/**
- * Returns the appropriate listener object based on the ONOS event type.
- *
- */
-public final class ListenerFactory {
-
-    // Store listeners for all supported events
-    private Map<Type, OnosEventListener> listeners =
-            new HashMap<Type, OnosEventListener>() {
-                {
-                    put(DEVICE, DeviceEventsListener.getInstance());
-                    put(LINK, LinkEventsListener.getInstance());
-                }
-            };
-
-    // Exists to defeat instantiation
-    private ListenerFactory() {
-    }
-
-    private static class SingletonHolder {
-        private static final ListenerFactory INSTANCE = new ListenerFactory();
-    }
-
-    /**
-     * Returns a static reference to the Listener Factory.
-     *
-     * @return singleton object
-     */
-    public static ListenerFactory getInstance() {
-        return SingletonHolder.INSTANCE;
-    }
-
-    /**
-     * Returns the listener object for the specified ONOS event type.
-     *
-     * @param event ONOS Event type
-     * @return return listener object
-     */
-    public OnosEventListener getListener(Type event) {
-        return listeners.get(event);
-    }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
deleted file mode 100644
index 22cde21..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-
-/**
- * APIs for starting and stopping a ONOS Event listener.
- *
- */
-public interface OnosEventListener {
-
-    /**
-     * Start the listener for the specific ONOS event type.
-     *
-     * @param event ONOS event type
-     * @param service ONOS event listener for the specific event type
-     */
-    void startListener(Type event, ListenerService<?, ?> service);
-
-    /**
-     * Stop the Listener for the specific ONOS event type.
-     *
-     * @param event ONOS event type
-     * @param service ONOS event listener for the specific event type
-     */
-    void stopListener(Type event, ListenerService<?, ?> service);
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
deleted file mode 100644
index f6f602f..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Listeners for listening to various ONOS events.
- *
- */
-package org.onosproject.kafkaintegration.listener;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
deleted file mode 100644
index 0456362..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.rest;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.rest.AbstractWebResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Rest Interfaces for subscribing/unsubscribing to event notifications.
- */
-@Path("kafkaService")
-public class EventExporterWebResource extends AbstractWebResource {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    public static final String JSON_NOT_NULL =
-            "Registration Data cannot be empty";
-    public static final String REGISTRATION_SUCCESSFUL =
-            "Registered Listener successfully";
-    public static final String DEREGISTRATION_SUCCESSFUL =
-            "De-Registered Listener successfully";
-    public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
-            "Event Registration successfull";
-    public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
-            "Event subscription unsuccessful";
-    public static final String EVENT_SUBSCRIPTION_REMOVED =
-            "Event De-Registration successfull";
-    /**
-     * Registers a listener for ONOS Events.
-     *
-     * @param appName The application trying to register
-     * @return 200 OK with UUID string which should be used as Kafka Consumer
-     *         Group Id
-     * @onos.rsModel KafkaRegistration
-     */
-    @POST
-    @Produces(MediaType.APPLICATION_JSON)
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Path("register")
-    public Response registerKafkaListener(String appName) {
-
-        EventExporterService service = get(EventExporterService.class);
-
-        EventSubscriberGroupId groupId = service.registerListener(appName);
-
-        log.info("Registered app {}", appName);
-
-        // TODO: Should also return Kafka server information.
-        // Will glue this in when we have the config and Kafka modules ready
-        return ok(groupId.getId().toString()).build();
-    }
-
-    /**
-     * Unregisters a listener for ONOS Events.
-     *
-     * @param appName The application trying to unregister
-     * @return 200 OK
-     * @onos.rsModel KafkaRegistration
-     */
-    @DELETE
-    @Path("unregister")
-    public Response removeKafkaListener(String appName) {
-        EventExporterService service = get(EventExporterService.class);
-
-        service.unregisterListener(appName);
-        log.info("Unregistered app {}", appName);
-        return ok(DEREGISTRATION_SUCCESSFUL).build();
-    }
-
-    /**
-     * Creates subscription to a specific ONOS event.
-     *
-     * @param input Subscription Data in JSON format
-     * @return 200 OK if successful or 400 BAD REQUEST
-     * @onos.rsModel KafkaSubscription
-     */
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("subscribe")
-    public Response subscribe(InputStream input) {
-
-        EventExporterService service = get(EventExporterService.class);
-
-        try {
-            EventSubscriber sub = parseSubscriptionData(input);
-            service.subscribe(sub);
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
-        }
-
-        return ok(EVENT_SUBSCRIPTION_SUCCESSFUL).build();
-    }
-
-    /**
-     * Parses JSON Subscription Data from the external application.
-     *
-     * @param input Subscription Data in JSON format
-     * @return parsed DTO object
-     * @throws IOException
-     */
-    private EventSubscriber parseSubscriptionData(InputStream input)
-            throws IOException {
-
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode node = (ObjectNode) mapper.readTree(input);
-        checkNotNull(node, JSON_NOT_NULL);
-        EventSubscriber codec = codec(EventSubscriber.class).decode(node, this);
-        checkNotNull(codec, JSON_NOT_NULL);
-        return codec;
-    }
-
-    /**
-     * Deletes subscription from a specific ONOS event.
-     *
-     * @param input data in JSON format
-     * @return 200 OK if successful or 400 BAD REQUEST
-     * @onos.rsModel KafkaSubscription
-     */
-    @DELETE
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Path("unsubscribe")
-    public Response unsubscribe(InputStream input) {
-
-        EventExporterService service = get(EventExporterService.class);
-
-        try {
-            EventSubscriber sub = parseSubscriptionData(input);
-            service.unsubscribe(sub);
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
-        }
-
-        return ok(EVENT_SUBSCRIPTION_REMOVED).build();
-    }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
deleted file mode 100644
index 2876ca1..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.rest;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.UUID;
-
-import org.onosproject.codec.CodecContext;
-import org.onosproject.codec.JsonCodec;
-import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Codec for encoding/decoding a Subscriber object to/from JSON.
- *
- */
-public final class SubscriberCodec extends JsonCodec<EventSubscriber> {
-
-    // JSON field names
-    private static final String NAME = "appName";
-    private static final String GROUP_ID = "groupId";
-    private static final String EVENT_TYPE = "eventType";
-
-    @Override
-    public ObjectNode encode(EventSubscriber data, CodecContext context) {
-        checkNotNull(data, "Subscriber cannot be null");
-        return context.mapper().createObjectNode().put(NAME, data.appName())
-                .put(GROUP_ID, data.subscriberGroupId().getId().toString())
-                .put(EVENT_TYPE, data.eventType().toString());
-    }
-
-    @Override
-    public EventSubscriber decode(ObjectNode json, CodecContext context) {
-
-        EventSubscriber.Builder resultBuilder = new DefaultEventSubscriber
-                .Builder();
-        String appName = json.get(NAME).asText();
-        resultBuilder.setAppName(appName);
-
-        String subscriberGroupId = json.get(GROUP_ID).asText();
-        resultBuilder.setSubscriberGroupId(new EventSubscriberGroupId(UUID.
-                fromString(subscriberGroupId)));
-
-        String eventType = json.get(EVENT_TYPE).asText();
-        resultBuilder.setEventType(Type.valueOf(eventType));
-        return resultBuilder.build();
-    }
-}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
deleted file mode 100644
index 7885851..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * REST API Definitions.
- *
- */
-package org.onosproject.kafkaintegration.rest;
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
deleted file mode 100644
index a81fd11..0000000
--- a/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
-  "type": "string",
-  "title": "KafkaRegistration",
-  "example": "forwardingApp"
-}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
deleted file mode 100644
index 819ca77..0000000
--- a/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
-  "type": "object",
-  "title": "KafkaSubscription",
-  "required": [
-    "appName",
-    "groupId",
-    "eventType"
-  ],
-  "properties": {
-     "appName": {
-        "type": "string",
-        "example": "forwardingApp"
-     },
-     "groupId": {
-        "type": "string",
-        "example": "18285435-2c62-4684-96dd-fb03b7cd0c83"
-     },
-     "eventType": {
-        "type": "string",
-        "example": "DEVICE"
-     }
-  }
-}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml b/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
deleted file mode 100644
index 32e5da7..0000000
--- a/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Copyright 2016 Open Networking Laboratory
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License");
-  ~ you may not use this file except in compliance with the License.
-  ~ You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
-         xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
-         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
-         id="ONOS" version="2.5">
-    <display-name>Event Exporter REST API</display-name>
-
-    <servlet>
-        <servlet-name>JAX-RS Service</servlet-name>
-        <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
-        <init-param>
-        <param-name>jersey.config.server.provider.classnames</param-name>
-        <param-value>org.onosproject.kafkaintegration.rest.EventExporterWebResource</param-value>
-        </init-param>
-    <load-on-startup>10</load-on-startup>
-    </servlet>
-
-    <servlet-mapping>
-        <servlet-name>JAX-RS Service</servlet-name>
-        <url-pattern>/*</url-pattern>
-    </servlet-mapping>
-
-</web-app>