Made changes as per comments.
kafkaProducer is now non-static.
TODO: KafkaPublisherManager Service and not Singleton.
Kafka event publishing.
Change-Id: I5ec20a6e4950c38e822468d343521ab77475b7d3
diff --git a/apps/kafka-integration/app/app.xml b/apps/kafka-integration/app/app.xml
index 5912d4b..a832936 100644
--- a/apps/kafka-integration/app/app.xml
+++ b/apps/kafka-integration/app/app.xml
@@ -19,6 +19,9 @@
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
features="${project.artifactId}" apps="org.onosproject.incubator.protobuf">
<description>${project.description}</description>
- <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+
<artifact>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</artifact>
+ <artifact>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</artifact>
+ <artifact>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</artifact>
+ <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</artifact>
</app>
diff --git a/apps/kafka-integration/app/features.xml b/apps/kafka-integration/app/features.xml
index 1e219b0..2982029 100644
--- a/apps/kafka-integration/app/features.xml
+++ b/apps/kafka-integration/app/features.xml
@@ -19,6 +19,8 @@
description="${project.description}">
<feature>onos-api</feature>
<bundle>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</bundle>
- <bundle>mvn:${project.groupId}/onos-app-kafka/${project.version}</bundle>
+ <bundle>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</bundle>
+ <bundle>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</bundle>
+ <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</bundle>
</feature>
</features>
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
index 1dafb0d..5a6e574 100644
--- a/apps/kafka-integration/app/pom.xml
+++ b/apps/kafka-integration/app/pom.xml
@@ -26,12 +26,6 @@
<artifactId>onos-app-kafka</artifactId>
- <packaging>bundle</packaging>
- <description>
- Kafka Integration Application.
- This will export ONOS Events to Northbound Kafka Server.
- </description>
-
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<onos.version>${project.version}</onos.version>
@@ -41,21 +35,18 @@
<web.context>/onos/kafka</web.context>
<api.version>1.0.0</api.version>
<api.package>org.onosproject.kafkaintegration.rest</api.package>
- <api.title>Kafka Integration Application REST API</api.title>
- <api.description>
- APIs for subscribing to Events generated by ONOS
- </api.description>
<onos.app.category>Utility</onos.app.category>
<onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
- <onos.app.readme>Export ONOS events to a Northbound Kafka server</onos.app.readme>
<onos.app.requires>org.onosproject.incubator.protobuf</onos.app.requires>
</properties>
- <dependencies>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- </dependency>
+ <packaging>pom</packaging>
+ <description>
+ Kafka Integration Application.
+ This will export ONOS Events to Northbound Kafka Server.
+ </description>
+
+ <dependencies>
<dependency>
<groupId>org.onosproject</groupId>
@@ -65,168 +56,22 @@
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onos-incubator-protobuf</artifactId>
+ <artifactId>onos-app-kafka-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onlab-osgi</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-rest</artifactId>
+ <artifactId>onos-app-kafka-web</artifactId>
<version>${project.version}</version>
</dependency>
+ <!--Also need to update the app.xml and the features.xml -->
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+ <version>0.8.2.2_1</version>
</dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- <scope>test</scope>
- <classifier>tests</classifier>
- </dependency>
-
- <dependency>
- <groupId>javax.ws.rs</groupId>
- <artifactId>javax.ws.rs-api</artifactId>
- <version>2.0.1</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>3.0.0-beta-2</version>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
-
- <dependency>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-servlet</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-core-serializers</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.felix</groupId>
- <artifactId>org.apache.felix.scr.annotations</artifactId>
- <scope>provided</scope>
- </dependency>
</dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <extensions>true</extensions>
- <configuration>
- <instructions>
- <Bundle-SymbolicName>
- ${project.groupId}.${project.artifactId}
- </Bundle-SymbolicName>
- <_wab>src/main/webapp/</_wab>
- <Include-Resource>
- WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
- {maven-resources}
- </Include-Resource>
- <Import-Package>
- org.slf4j,
- org.osgi.framework,
- javax.ws.rs,
- javax.ws.rs.core,
- org.glassfish.jersey.servlet,
- com.fasterxml.jackson.databind,
- com.fasterxml.jackson.databind.node,
- com.fasterxml.jackson.core,
- org.onlab.packet.*,
- org.onosproject.*,
- org.onlab.util.*,
- com.google.common.*,
- com.google.protobuf.*
- </Import-Package>
- <Web-ContextPath>${web.context}</Web-ContextPath>
- </instructions>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-scr-plugin</artifactId>
- <executions>
- <execution>
- <id>generate-scr-srcdescriptor</id>
- <goals>
- <goal>scr</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <supportedProjectTypes>
- <supportedProjectType>bundle</supportedProjectType>
- <supportedProjectType>war</supportedProjectType>
- </supportedProjectTypes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>cfg</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>cfg</goal>
- </goals>
- </execution>
- <execution>
- <id>swagger</id>
- <goals>
- <goal>swagger</goal>
- </goals>
- </execution>
- <execution>
- <id>app</id>
- <phase>package</phase>
- <goals>
- <goal>app</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
deleted file mode 100644
index 8f69a20..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;;
-
-/**
- * Returns the appropriate converter object based on the ONOS event type.
- *
- */
-public final class ConversionFactory {
-
- // Store converters for all supported events
- private Map<Type, EventConverter> converters =
- new HashMap<Type, EventConverter>() {
- {
- put(DEVICE, new DeviceEventConverter());
- put(LINK, new LinkEventConverter());
- }
- };
-
- // Exists to defeat instantiation
- private ConversionFactory() {
- }
-
- private static class SingletonHolder {
- private static final ConversionFactory INSTANCE =
- new ConversionFactory();
- }
-
- /**
- * Returns a static reference to the Conversion Factory.
- *
- * @return singleton object
- */
- public static ConversionFactory getInstance() {
- return SingletonHolder.INSTANCE;
- }
-
- /**
- * Returns an Event converter object for the specified ONOS event type.
- *
- * @param event ONOS event type
- * @return Event Converter object
- */
- public EventConverter getConverter(Type event) {
- return converters.get(event);
- }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
deleted file mode 100644
index ac04040..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import org.onosproject.event.Event;
-import org.onosproject.grpc.net.Device.DeviceCore;
-import org.onosproject.grpc.net.Device.DeviceType;
-import org.onosproject.grpc.net.DeviceEvent.DeviceEventType;
-import org.onosproject.grpc.net.DeviceEvent.DeviceNotification;
-import org.onosproject.grpc.net.Port.PortCore;
-import org.onosproject.grpc.net.Port.PortType;
-import org.onosproject.net.device.DeviceEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Converts ONOS Device event message to GPB format.
- *
- */
-class DeviceEventConverter implements EventConverter {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Override
- public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
-
- DeviceEvent deviceEvent = (DeviceEvent) event;
-
- if (!deviceEventTypeSupported(deviceEvent)) {
- log.error("Unsupported Onos Device Event {}. There is no matching"
- + "proto Device Event type", deviceEvent.type().toString());
- return null;
- }
-
- return buildDeviceProtoMessage(deviceEvent);
- }
-
- /**
- * Checks if the ONOS Device Event type is supported.
- *
- * @param event ONOS Device event
- * @return true if there is a match and false otherwise
- */
- private boolean deviceEventTypeSupported(DeviceEvent event) {
- DeviceEventType[] deviceEvents = DeviceEventType.values();
- for (DeviceEventType deviceEventType : deviceEvents) {
- if (deviceEventType.name().equals(event.type().name())) {
- return true;
- }
- }
-
- return false;
- }
-
- private DeviceNotification buildDeviceProtoMessage(DeviceEvent deviceEvent) {
- DeviceNotification.Builder notificationBuilder =
- DeviceNotification.newBuilder();
-
- DeviceCore deviceCore =
- DeviceCore.newBuilder()
- .setChassisId(deviceEvent.subject().chassisId().id()
- .toString())
- .setDeviceId(deviceEvent.subject().id().toString())
- .setHwVersion(deviceEvent.subject().hwVersion())
- .setManufacturer(deviceEvent.subject().manufacturer())
- .setSerialNumber(deviceEvent.subject().serialNumber())
- .setSwVersion(deviceEvent.subject().swVersion())
- .setType(DeviceType
- .valueOf(deviceEvent.subject().type().name()))
- .build();
-
- PortCore portCore = null;
- if (deviceEvent.port() != null) {
- portCore =
- PortCore.newBuilder()
- .setIsEnabled(deviceEvent.port().isEnabled())
- .setPortNumber(deviceEvent.port().number()
- .toString())
- .setPortSpeed(deviceEvent.port().portSpeed())
- .setType(PortType
- .valueOf(deviceEvent.port().type().name()))
- .build();
-
- notificationBuilder.setPort(portCore);
- }
-
- notificationBuilder.setDeviceEventType(getProtoType(deviceEvent))
- .setDevice(deviceCore);
-
- return notificationBuilder.build();
- }
-
- /**
- * Retrieves the protobuf generated device event type.
- *
- * @param event ONOS Device Event
- * @return generated Device Event Type
- */
- private DeviceEventType getProtoType(DeviceEvent event) {
- DeviceEventType protobufEventType = null;
- DeviceEventType[] deviceEvents = DeviceEventType.values();
- for (DeviceEventType deviceEventType : deviceEvents) {
- if (deviceEventType.name().equals(event.type().name())) {
- protobufEventType = deviceEventType;
- }
- }
-
- return protobufEventType;
- }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
deleted file mode 100644
index e15ebdc..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import org.onosproject.event.Event;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- *
- * APIs for converting between ONOS event objects and GPB data objects.
- *
- */
-public interface EventConverter {
-
- /**
- * Converts ONOS specific event data to a format that is suitable for export
- * to Kafka.
- *
- * @param event ONOS Event object
- * @return converted data in GPB format.
- */
- public GeneratedMessage convertToProtoMessage(Event<?, ?> event);
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
deleted file mode 100644
index a1db209..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.converter;
-
-import org.onosproject.event.Event;
-import org.onosproject.grpc.net.Link.ConnectPoint;
-import org.onosproject.grpc.net.Link.LinkCore;
-import org.onosproject.grpc.net.Link.LinkState;
-import org.onosproject.grpc.net.Link.LinkType;
-import org.onosproject.grpc.net.LinkEvent.LinkEventType;
-import org.onosproject.grpc.net.LinkEvent.LinkNotification;
-import org.onosproject.net.link.LinkEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Converts for ONOS Link event message to GPB format.
- *
- */
-class LinkEventConverter implements EventConverter {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Override
- public GeneratedMessage convertToProtoMessage(Event<?, ?> event) {
-
- LinkEvent linkEvent = (LinkEvent) event;
-
- if (!linkEventTypeSupported(linkEvent)) {
- log.error("Unsupported Onos Event {}. There is no matching"
- + "proto Event type", linkEvent.type().toString());
- return null;
- }
-
- return buildDeviceProtoMessage(linkEvent);
- }
-
- private boolean linkEventTypeSupported(LinkEvent event) {
- LinkEventType[] kafkaLinkEvents = LinkEventType.values();
- for (LinkEventType linkEventType : kafkaLinkEvents) {
- if (linkEventType.name().equals(event.type().name())) {
- return true;
- }
- }
-
- return false;
- }
-
- private LinkNotification buildDeviceProtoMessage(LinkEvent linkEvent) {
- LinkNotification notification = LinkNotification.newBuilder()
- .setLinkEventType(getProtoType(linkEvent))
- .setLink(LinkCore.newBuilder()
- .setState(LinkState
- .valueOf(linkEvent.subject().state().name()))
- .setType(LinkType
- .valueOf(linkEvent.subject().type().name()))
- .setDst(ConnectPoint.newBuilder()
- .setDeviceId(linkEvent.subject().dst()
- .deviceId().toString())
- .setPortNumber(linkEvent.subject().dst().port()
- .toString()))
- .setSrc(ConnectPoint.newBuilder()
- .setDeviceId(linkEvent.subject().src()
- .deviceId().toString())
- .setPortNumber(linkEvent.subject().src().port()
- .toString())))
- .build();
-
- return notification;
- }
-
- /**
- * Returns the specific Kafka Device Event Type for the corresponding ONOS
- * Device Event Type.
- *
- * @param event ONOS Device Event
- * @return Kafka Device Event Type
- */
- private LinkEventType getProtoType(LinkEvent event) {
- LinkEventType generatedEventType = null;
- LinkEventType[] kafkaEvents = LinkEventType.values();
- for (LinkEventType linkEventType : kafkaEvents) {
- if (linkEventType.name().equals(event.type().name())) {
- generatedEventType = linkEventType;
- }
- }
-
- return generatedEventType;
- }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
deleted file mode 100644
index d33c0d7..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Converters for converting various ONOS events to their corresponding Protocol
- * Buffer format.
- *
- */
-package org.onosproject.kafkaintegration.converter;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
deleted file mode 100644
index d211458..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.kafkaintegration.api.ExportableEventListener;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Dispatch ONOS Events to all interested Listeners.
- *
- */
-public final class Dispatcher
- extends AbstractListenerManager<OnosEvent, ExportableEventListener> {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- // Exists to defeat instantiation
- private Dispatcher() {
- }
-
- private static class SingletonHolder {
- private static final Dispatcher INSTANCE = new Dispatcher();
- }
-
- /**
- * Returns a static reference to the Listener Factory.
- *
- * @return singleton object
- */
- public static Dispatcher getInstance() {
- return SingletonHolder.INSTANCE;
- }
-
- /**
- * Publish the ONOS Event to all listeners.
- *
- * @param eventType the ONOS eventtype
- * @param message generated Protocol buffer message from ONOS event data
- */
- public void publish(Type eventType, GeneratedMessage message) {
- log.debug("Dispatching ONOS Event {}", eventType);
- post(new OnosEvent(eventType, message));
- }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
deleted file mode 100644
index e2e8e57..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
-import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
-import org.onosproject.kafkaintegration.listener.ListenerFactory;
-import org.onosproject.kafkaintegration.listener.OnosEventListener;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.link.LinkService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of Event Exporter Service.
- *
- */
-@Component(immediate = true)
-@Service
-public class EventExporterManager implements EventExporterService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- // Stores the currently registered applications for event export service.
- private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
-
- private Map<Type, List<EventSubscriber>> subscriptions;
-
- private static final String REGISTERED_APPS = "registered-applications";
-
- private static final String SUBSCRIBED_APPS = "event-subscriptions";
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LinkService linkService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- private ApplicationId appId;
-
- @Activate
- protected void activate() {
- appId = coreService
- .registerApplication("org.onosproject.kafkaintegration");
-
- registeredApps = storageService
- .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
- .withName(REGISTERED_APPS)
- .withSerializer(Serializer.using(KryoNamespaces.API,
- EventSubscriberGroupId.class,
- UUID.class))
- .build().asJavaMap();
-
- subscriptions = storageService
- .<Type, List<EventSubscriber>>consistentMapBuilder()
- .withName(SUBSCRIBED_APPS)
- .withSerializer(Serializer.using(KryoNamespaces.API,
- EventSubscriber.class,
- OnosEvent.class,
- OnosEvent.Type.class,
- DefaultEventSubscriber.class,
- EventSubscriberGroupId.class,
- UUID.class))
- .build().asJavaMap();
-
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public EventSubscriberGroupId registerListener(String appName) {
-
- // TODO: Remove it once ONOS provides a mechanism for external apps
- // to register with the core service. See Jira - 4409
- ApplicationId externalAppId = coreService.registerApplication(appName);
-
- return registeredApps.computeIfAbsent(externalAppId,
- (key) -> new EventSubscriberGroupId(UUID
- .randomUUID()));
- }
-
- @Override
- public void unregisterListener(String appName) {
- ApplicationId externalAppId =
- checkNotNull(coreService.getAppId(appName));
- registeredApps.remove(externalAppId);
- }
-
- @Override
- public void subscribe(EventSubscriber subscriber)
- throws InvalidGroupIdException, InvalidApplicationException {
-
- checkNotNull(subscriber);
-
- if (!registeredApplication(subscriber.appName())) {
- throw new InvalidApplicationException("Application is not "
- + "registered to make this request.");
- }
-
- if (!validGroupId(subscriber.subscriberGroupId(),
- subscriber.appName())) {
- throw new InvalidGroupIdException("Incorrect group id in the request");
- }
-
- OnosEventListener onosListener = getListener(subscriber.eventType());
- checkNotNull(onosListener,
- "No listener for the supported event type - {}",
- subscriber.eventType());
-
- applyListenerAction(subscriber.eventType(), onosListener,
- ListenerAction.START);
-
- // update internal state
- List<EventSubscriber> subscriptionList =
- subscriptions.get(subscriber.eventType());
- if (subscriptionList == null) {
- subscriptionList = new ArrayList<EventSubscriber>();
- }
- subscriptionList.add(subscriber);
- subscriptions.put(subscriber.eventType(), subscriptionList);
-
- log.info("Subscription for {} event by {} successfull",
- subscriber.eventType(), subscriber.appName());
- }
-
- /**
- * Checks if the application has registered.
- *
- * @param appName application name
- * @return true if application has registered
- */
- private boolean registeredApplication(String appName) {
-
- checkNotNull(appName);
- ApplicationId appId = checkNotNull(coreService.getAppId(appName));
- if (registeredApps.containsKey(appId)) {
- return true;
- }
-
- log.debug("{} is not registered", appName);
- return false;
- }
-
- /**
- * Actions that can be performed on the ONOS Event Listeners.
- *
- */
- private enum ListenerAction {
- START, STOP;
- }
-
- /**
- * Applies the specified action on the Listener.
- *
- * @param eventType the ONOS Event type registered by the application
- * @param onosListener ONOS event listener
- * @param action to be performed on the listener
- */
- private void applyListenerAction(Type eventType,
- OnosEventListener onosListener,
- ListenerAction action) {
- switch (eventType) {
- case DEVICE:
- if (action == ListenerAction.START) {
- onosListener.startListener(DEVICE, deviceService);
- } else {
- onosListener.stopListener(DEVICE, deviceService);
- }
- break;
- case LINK:
- if (action == ListenerAction.START) {
- onosListener.startListener(LINK, linkService);
- } else {
- onosListener.stopListener(LINK, linkService);
- }
- break;
- default:
- log.error("Cannot {} listener. Unsupported event type {} ",
- action.toString(), eventType.toString());
- }
- }
-
- /**
- * Returns the ONOS event listener corresponding to the ONOS Event type.
- *
- * @param eventType ONOS event type
- * @return ONOS event listener
- */
- private OnosEventListener getListener(Type eventType) {
- checkNotNull(eventType);
- ListenerFactory factory = ListenerFactory.getInstance();
- OnosEventListener onosListener = factory.getListener(eventType);
- return onosListener;
- }
-
- /**
- * Checks if the group id is valid for this registered application.
- *
- * @param groupId GroupId assigned to the subscriber
- * @param appName Registered Application name
- * @return true if valid groupId and false otherwise
- */
- private boolean validGroupId(EventSubscriberGroupId groupId,
- String appName) {
-
- checkNotNull(groupId);
-
- ApplicationId appId = coreService.getAppId(appName);
- EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
- if (registeredGroupId.equals(groupId)) {
- return true;
- }
-
- return false;
- }
-
- @Override
- public void unsubscribe(EventSubscriber subscriber)
- throws InvalidGroupIdException, InvalidApplicationException {
-
- checkNotNull(subscriber);
-
- if (!registeredApplication(subscriber.appName())) {
- throw new InvalidApplicationException("Application is not "
- + "registered to make this request.");
- }
-
- if (!validGroupId(subscriber.subscriberGroupId(),
- subscriber.appName())) {
- throw new InvalidGroupIdException("Incorrect group id in the request");
- }
-
- if (!eventSubscribed(subscriber)) {
- log.error("No subscription to {} was found",
- subscriber.eventType());
- return;
- }
-
- // If this is the only subscriber listening for this event,
- // stop the listener.
- List<EventSubscriber> subscribers =
- subscriptions.get(subscriber.eventType());
- if (subscribers.size() == 1) {
- OnosEventListener onosListener =
- getListener(subscriber.eventType());
- checkNotNull(onosListener,
- "No listener for the supported event type - {}",
- subscriber.eventType());
- applyListenerAction(subscriber.eventType(), onosListener,
- ListenerAction.STOP);
- }
-
- // update internal state.
- subscribers.remove(subscriber);
- subscriptions.put(subscriber.eventType(), subscribers);
-
- log.info("Unsubscribed {} for {} events", subscriber.appName(),
- subscriber.eventType());
- }
-
- /**
- * Checks if the subscriber has already subscribed to the requested event
- * type.
- *
- * @param subscriber the subscriber to a specific ONOS event
- * @return true if subscriber has subscribed to the ONOS event
- */
- private boolean eventSubscribed(EventSubscriber subscriber) {
-
- List<EventSubscriber> subscriberList =
- subscriptions.get(subscriber.eventType());
-
- if (subscriberList == null) {
- return false;
- }
-
- return subscriberList.contains(subscriber);
- }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
deleted file mode 100644
index 47098fa..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.codec.CodecService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.rest.SubscriberCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of the JSON codec brokering service for Kafka app.
- */
-@Component(immediate = true)
-public class KafkaCodecRegistrator {
- private static Logger log = LoggerFactory.getLogger(KafkaCodecRegistrator
- .class);
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CodecService codecService;
-
- @Activate
- public void activate() {
- codecService.registerCodec(EventSubscriber.class, new SubscriberCodec());
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
deleted file mode 100644
index 4b9035e..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.impl;
-
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent;
-import org.onosproject.store.service.AtomicValue;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(immediate = true)
-public class KafkaStorageManager implements KafkaEventStorageService {
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- private TreeMap<Long, OnosEvent> kafkaEventStore;
-
- private AtomicValue<Long> lastPublishedEvent;
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private ScheduledExecutorService gcExService;
-
- private InternalGarbageCollector gcTask;
-
- // Thread scheduler parameters.
- private final long delay = 0;
- private final long period = 1;
-
- @Activate
- protected void activate() {
- kafkaEventStore = new TreeMap<Long, OnosEvent>();
- lastPublishedEvent = storageService.<Long>atomicValueBuilder()
- .withName("onos-app-kafka-published-seqNumber").build()
- .asAtomicValue();
-
- startGC();
-
- log.info("Started");
- }
-
- private void startGC() {
- log.info("Starting Garbage Collection Service");
- gcExService = Executors.newSingleThreadScheduledExecutor();
- gcTask = new InternalGarbageCollector();
- gcExService.scheduleAtFixedRate(gcTask, delay, period,
- TimeUnit.SECONDS);
- }
-
- @Deactivate
- protected void deactivate() {
- stopGC();
- log.info("Stopped");
- }
-
- private void stopGC() {
- log.info("Stopping Garbage Collection Service");
- gcExService.shutdown();
- }
-
- @Override
- public boolean insertCacheEntry(OnosEvent e) {
- // TODO: Fill in the code once the event carries timestamp info.
- return true;
- }
-
- @Override
- public void updateLastPublishedEntry(Long sequenceNumber) {
- this.lastPublishedEvent.set(sequenceNumber);
- }
-
- /**
- * Removes events from the Kafka Event Store which have been published.
- *
- */
- private class InternalGarbageCollector implements Runnable {
-
- @Override
- public void run() {
- kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
- }
- }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
deleted file mode 100644
index a2950fa..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * API implementation classes.
- */
-package org.onosproject.kafkaintegration.impl;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
deleted file mode 100644
index 22c4bf2..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-
-import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.onosproject.kafkaintegration.converter.ConversionFactory;
-import org.onosproject.kafkaintegration.converter.EventConverter;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
-import org.onosproject.net.device.DeviceService;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Listens for ONOS Device events.
- *
- */
-final class DeviceEventsListener implements OnosEventListener {
-
- private boolean listenerRunning = false;
-
- private InnerListener listener = null;
-
- // Exists to defeat instantiation
- private DeviceEventsListener() {
- }
-
- private static class SingletonHolder {
- private static final DeviceEventsListener INSTANCE =
- new DeviceEventsListener();
- }
-
- /**
- * Returns a static reference to the Listener Factory.
- *
- * @return singleton object
- */
- public static DeviceEventsListener getInstance() {
- return SingletonHolder.INSTANCE;
- }
-
- @Override
- public void startListener(Type e, ListenerService<?, ?> service) {
- if (!listenerRunning) {
- listener = new InnerListener();
- DeviceService deviceService = (DeviceService) service;
- deviceService.addListener(listener);
- listenerRunning = true;
- }
- }
-
- private class InnerListener implements DeviceListener {
-
- @Override
- public void event(DeviceEvent arg0) {
-
- // Convert the event to GPB format
- ConversionFactory conversionFactory =
- ConversionFactory.getInstance();
- EventConverter converter = conversionFactory.getConverter(DEVICE);
- GeneratedMessage message = converter.convertToProtoMessage(arg0);
-
- // Call Dispatcher and publish event
- Dispatcher.getInstance().publish(DEVICE, message);
- }
- }
-
- @Override
- public void stopListener(Type e, ListenerService<?, ?> service) {
- if (listenerRunning) {
- DeviceService deviceService = (DeviceService) service;
- deviceService.removeListener(listener);
- listener = null;
- listenerRunning = false;
- }
- }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
deleted file mode 100644
index 769d8b4..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import org.onosproject.kafkaintegration.converter.ConversionFactory;
-import org.onosproject.kafkaintegration.converter.EventConverter;
-import org.onosproject.net.link.LinkEvent;
-import org.onosproject.net.link.LinkListener;
-import org.onosproject.net.link.LinkService;
-
-import com.google.protobuf.GeneratedMessage;
-
-/**
- * Listens for ONOS Link Events.
- *
- */
-final class LinkEventsListener implements OnosEventListener {
-
- private boolean listenerRunning = false;
-
- private InnerListener listener = null;
-
- // Exists to defeat instantiation
- private LinkEventsListener() {
- }
-
- private static class SingletonHolder {
- private static final LinkEventsListener INSTANCE =
- new LinkEventsListener();
- }
-
- /**
- * Returns a static reference to the Listener Factory.
- *
- * @return singleton object
- */
- public static LinkEventsListener getInstance() {
- return SingletonHolder.INSTANCE;
- }
-
- @Override
- public void startListener(Type e, ListenerService<?, ?> service) {
- if (!listenerRunning) {
- listener = new InnerListener();
- LinkService linkService = (LinkService) service;
- linkService.addListener(listener);
- listenerRunning = true;
- }
- }
-
- private class InnerListener implements LinkListener {
-
- @Override
- public void event(LinkEvent arg0) {
-
- // Convert the event to GPB format
- ConversionFactory conversionFactory =
- ConversionFactory.getInstance();
- EventConverter converter = conversionFactory.getConverter(LINK);
- GeneratedMessage message = converter.convertToProtoMessage(arg0);
-
- // Call Dispatcher and publish event
- Dispatcher.getInstance().publish(LINK, message);
- }
- }
-
- @Override
- public void stopListener(Type e, ListenerService<?, ?> service) {
- if (listenerRunning) {
- LinkService linkService = (LinkService) service;
- linkService.removeListener(listener);
- listenerRunning = false;
- }
- }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
deleted file mode 100644
index 496ba2b..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-
-/**
- * Returns the appropriate listener object based on the ONOS event type.
- *
- */
-public final class ListenerFactory {
-
- // Store listeners for all supported events
- private Map<Type, OnosEventListener> listeners =
- new HashMap<Type, OnosEventListener>() {
- {
- put(DEVICE, DeviceEventsListener.getInstance());
- put(LINK, LinkEventsListener.getInstance());
- }
- };
-
- // Exists to defeat instantiation
- private ListenerFactory() {
- }
-
- private static class SingletonHolder {
- private static final ListenerFactory INSTANCE = new ListenerFactory();
- }
-
- /**
- * Returns a static reference to the Listener Factory.
- *
- * @return singleton object
- */
- public static ListenerFactory getInstance() {
- return SingletonHolder.INSTANCE;
- }
-
- /**
- * Returns the listener object for the specified ONOS event type.
- *
- * @param event ONOS Event type
- * @return return listener object
- */
- public OnosEventListener getListener(Type event) {
- return listeners.get(event);
- }
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
deleted file mode 100644
index 22cde21..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright 2016 Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.listener;
-
-import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-
-/**
- * APIs for starting and stopping a ONOS Event listener.
- *
- */
-public interface OnosEventListener {
-
- /**
- * Start the listener for the specific ONOS event type.
- *
- * @param event ONOS event type
- * @param service ONOS event listener for the specific event type
- */
- void startListener(Type event, ListenerService<?, ?> service);
-
- /**
- * Stop the Listener for the specific ONOS event type.
- *
- * @param event ONOS event type
- * @param service ONOS event listener for the specific event type
- */
- void stopListener(Type event, ListenerService<?, ?> service);
-
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
deleted file mode 100644
index f6f602f..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Listeners for listening to various ONOS events.
- *
- */
-package org.onosproject.kafkaintegration.listener;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
deleted file mode 100644
index 0456362..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.rest;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.rest.AbstractWebResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Rest Interfaces for subscribing/unsubscribing to event notifications.
- */
-@Path("kafkaService")
-public class EventExporterWebResource extends AbstractWebResource {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- public static final String JSON_NOT_NULL =
- "Registration Data cannot be empty";
- public static final String REGISTRATION_SUCCESSFUL =
- "Registered Listener successfully";
- public static final String DEREGISTRATION_SUCCESSFUL =
- "De-Registered Listener successfully";
- public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
- "Event Registration successfull";
- public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
- "Event subscription unsuccessful";
- public static final String EVENT_SUBSCRIPTION_REMOVED =
- "Event De-Registration successfull";
- /**
- * Registers a listener for ONOS Events.
- *
- * @param appName The application trying to register
- * @return 200 OK with UUID string which should be used as Kafka Consumer
- * Group Id
- * @onos.rsModel KafkaRegistration
- */
- @POST
- @Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
- @Path("register")
- public Response registerKafkaListener(String appName) {
-
- EventExporterService service = get(EventExporterService.class);
-
- EventSubscriberGroupId groupId = service.registerListener(appName);
-
- log.info("Registered app {}", appName);
-
- // TODO: Should also return Kafka server information.
- // Will glue this in when we have the config and Kafka modules ready
- return ok(groupId.getId().toString()).build();
- }
-
- /**
- * Unregisters a listener for ONOS Events.
- *
- * @param appName The application trying to unregister
- * @return 200 OK
- * @onos.rsModel KafkaRegistration
- */
- @DELETE
- @Path("unregister")
- public Response removeKafkaListener(String appName) {
- EventExporterService service = get(EventExporterService.class);
-
- service.unregisterListener(appName);
- log.info("Unregistered app {}", appName);
- return ok(DEREGISTRATION_SUCCESSFUL).build();
- }
-
- /**
- * Creates subscription to a specific ONOS event.
- *
- * @param input Subscription Data in JSON format
- * @return 200 OK if successful or 400 BAD REQUEST
- * @onos.rsModel KafkaSubscription
- */
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- @Path("subscribe")
- public Response subscribe(InputStream input) {
-
- EventExporterService service = get(EventExporterService.class);
-
- try {
- EventSubscriber sub = parseSubscriptionData(input);
- service.subscribe(sub);
- } catch (Exception e) {
- log.error(e.getMessage());
- return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
- }
-
- return ok(EVENT_SUBSCRIPTION_SUCCESSFUL).build();
- }
-
- /**
- * Parses JSON Subscription Data from the external application.
- *
- * @param input Subscription Data in JSON format
- * @return parsed DTO object
- * @throws IOException
- */
- private EventSubscriber parseSubscriptionData(InputStream input)
- throws IOException {
-
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode node = (ObjectNode) mapper.readTree(input);
- checkNotNull(node, JSON_NOT_NULL);
- EventSubscriber codec = codec(EventSubscriber.class).decode(node, this);
- checkNotNull(codec, JSON_NOT_NULL);
- return codec;
- }
-
- /**
- * Deletes subscription from a specific ONOS event.
- *
- * @param input data in JSON format
- * @return 200 OK if successful or 400 BAD REQUEST
- * @onos.rsModel KafkaSubscription
- */
- @DELETE
- @Consumes(MediaType.APPLICATION_JSON)
- @Path("unsubscribe")
- public Response unsubscribe(InputStream input) {
-
- EventExporterService service = get(EventExporterService.class);
-
- try {
- EventSubscriber sub = parseSubscriptionData(input);
- service.unsubscribe(sub);
- } catch (Exception e) {
- log.error(e.getMessage());
- return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
- }
-
- return ok(EVENT_SUBSCRIPTION_REMOVED).build();
- }
-}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
deleted file mode 100644
index 2876ca1..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.kafkaintegration.rest;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.UUID;
-
-import org.onosproject.codec.CodecContext;
-import org.onosproject.codec.JsonCodec;
-import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Codec for encoding/decoding a Subscriber object to/from JSON.
- *
- */
-public final class SubscriberCodec extends JsonCodec<EventSubscriber> {
-
- // JSON field names
- private static final String NAME = "appName";
- private static final String GROUP_ID = "groupId";
- private static final String EVENT_TYPE = "eventType";
-
- @Override
- public ObjectNode encode(EventSubscriber data, CodecContext context) {
- checkNotNull(data, "Subscriber cannot be null");
- return context.mapper().createObjectNode().put(NAME, data.appName())
- .put(GROUP_ID, data.subscriberGroupId().getId().toString())
- .put(EVENT_TYPE, data.eventType().toString());
- }
-
- @Override
- public EventSubscriber decode(ObjectNode json, CodecContext context) {
-
- EventSubscriber.Builder resultBuilder = new DefaultEventSubscriber
- .Builder();
- String appName = json.get(NAME).asText();
- resultBuilder.setAppName(appName);
-
- String subscriberGroupId = json.get(GROUP_ID).asText();
- resultBuilder.setSubscriberGroupId(new EventSubscriberGroupId(UUID.
- fromString(subscriberGroupId)));
-
- String eventType = json.get(EVENT_TYPE).asText();
- resultBuilder.setEventType(Type.valueOf(eventType));
- return resultBuilder.build();
- }
-}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
deleted file mode 100644
index 7885851..0000000
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
- * License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * REST API Definitions.
- *
- */
-package org.onosproject.kafkaintegration.rest;
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
deleted file mode 100644
index a81fd11..0000000
--- a/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
- "type": "string",
- "title": "KafkaRegistration",
- "example": "forwardingApp"
-}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json b/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
deleted file mode 100644
index 819ca77..0000000
--- a/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
- "type": "object",
- "title": "KafkaSubscription",
- "required": [
- "appName",
- "groupId",
- "eventType"
- ],
- "properties": {
- "appName": {
- "type": "string",
- "example": "forwardingApp"
- },
- "groupId": {
- "type": "string",
- "example": "18285435-2c62-4684-96dd-fb03b7cd0c83"
- },
- "eventType": {
- "type": "string",
- "example": "DEVICE"
- }
- }
-}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml b/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
deleted file mode 100644
index 32e5da7..0000000
--- a/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2016 Open Networking Laboratory
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
- xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
- xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
- id="ONOS" version="2.5">
- <display-name>Event Exporter REST API</display-name>
-
- <servlet>
- <servlet-name>JAX-RS Service</servlet-name>
- <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
- <init-param>
- <param-name>jersey.config.server.provider.classnames</param-name>
- <param-value>org.onosproject.kafkaintegration.rest.EventExporterWebResource</param-value>
- </init-param>
- <load-on-startup>10</load-on-startup>
- </servlet>
-
- <servlet-mapping>
- <servlet-name>JAX-RS Service</servlet-name>
- <url-pattern>/*</url-pattern>
- </servlet-mapping>
-
-</web-app>