Made changes as per comments.
kafkaProducer is now non-static.
TODO: KafkaPublisherManager Service and not Singleton.
Kafka event publishing.
Change-Id: I5ec20a6e4950c38e822468d343521ab77475b7d3
diff --git a/apps/kafka-integration/api/pom.xml b/apps/kafka-integration/api/pom.xml
index a5aff84..d8386be 100644
--- a/apps/kafka-integration/api/pom.xml
+++ b/apps/kafka-integration/api/pom.xml
@@ -38,6 +38,7 @@
<artifactId>protobuf-java</artifactId>
<version>3.0.0-beta-2</version>
</dependency>
+
</dependencies>
<build>
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventConversionService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventConversionService.java
new file mode 100644
index 0000000..4c73d96
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventConversionService.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.api;
+
+import org.onosproject.event.Event;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+
+/**
+ * API for conversion of various ONOS events to Protobuf.
+ *
+ */
+public interface EventConversionService {
+ OnosEvent convertEvent(Event<?, ?> event);
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
similarity index 85%
rename from apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
rename to apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
index e203697..07b03de 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventExporterService.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
@@ -16,16 +16,19 @@
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
import com.google.common.annotations.Beta;
+import java.util.List;
+
/**
* APIs for subscribing to Onos Event Messages.
*/
@Beta
-public interface EventExporterService {
+public interface EventSubscriptionService {
/**
* Registers the external application to receive events generated in ONOS.
@@ -61,4 +64,12 @@
*/
void unsubscribe(EventSubscriber subscriber)
throws InvalidGroupIdException, InvalidApplicationException;
+
+ /**
+ * Returns the event subscriber for various event types.
+ *
+ * @param type ONOS event type.
+ * @return List of event subscribers
+ */
+ List<EventSubscriber> getEventSubscribers(Type type);
}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java
new file mode 100644
index 0000000..b92b890
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaPublisherService.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.api;
+
+import com.google.protobuf.GeneratedMessage;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+
+/**
+ * API for dispatching ONOS events.
+ */
+public interface KafkaPublisherService {
+
+ /**
+ * Publish the ONOS Event to all listeners.
+ *
+ * @param eventType the ONOS eventtype
+ * @param message generated Protocol buffer message from ONOS event data
+ */
+ void publish(Type eventType, GeneratedMessage message);
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
index ff1d27e..2d51c7c 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/OnosEvent.java
@@ -19,7 +19,7 @@
import com.google.protobuf.GeneratedMessage;
/**
- * Represents the converted Onos Event data into GPB format.
+ * Represents the converted Onos Event data into protobuf format.
*
*/
public class OnosEvent extends AbstractEvent<OnosEvent.Type, GeneratedMessage> {
@@ -38,6 +38,19 @@
* List of Event Types supported.
*/
public enum Type {
- DEVICE, LINK;
+ /**
+ * Signifies Device events.
+ */
+ DEVICE("DEVICE"),
+
+ /**
+ * Signifies Link events.
+ */
+ LINK("LINK");
+ public String typeName;
+
+ Type(String name) {
+ typeName = name;
+ }
}
}
diff --git a/apps/kafka-integration/app/app.xml b/apps/kafka-integration/app/app.xml
index 5912d4b..a832936 100644
--- a/apps/kafka-integration/app/app.xml
+++ b/apps/kafka-integration/app/app.xml
@@ -19,6 +19,9 @@
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
features="${project.artifactId}" apps="org.onosproject.incubator.protobuf">
<description>${project.description}</description>
- <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+
<artifact>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</artifact>
+ <artifact>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</artifact>
+ <artifact>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</artifact>
+ <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</artifact>
</app>
diff --git a/apps/kafka-integration/app/features.xml b/apps/kafka-integration/app/features.xml
index 1e219b0..2982029 100644
--- a/apps/kafka-integration/app/features.xml
+++ b/apps/kafka-integration/app/features.xml
@@ -19,6 +19,8 @@
description="${project.description}">
<feature>onos-api</feature>
<bundle>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</bundle>
- <bundle>mvn:${project.groupId}/onos-app-kafka/${project.version}</bundle>
+ <bundle>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</bundle>
+ <bundle>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</bundle>
+ <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</bundle>
</feature>
</features>
diff --git a/apps/kafka-integration/app/pom.xml b/apps/kafka-integration/app/pom.xml
index 1dafb0d..5a6e574 100644
--- a/apps/kafka-integration/app/pom.xml
+++ b/apps/kafka-integration/app/pom.xml
@@ -26,12 +26,6 @@
<artifactId>onos-app-kafka</artifactId>
- <packaging>bundle</packaging>
- <description>
- Kafka Integration Application.
- This will export ONOS Events to Northbound Kafka Server.
- </description>
-
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<onos.version>${project.version}</onos.version>
@@ -41,21 +35,18 @@
<web.context>/onos/kafka</web.context>
<api.version>1.0.0</api.version>
<api.package>org.onosproject.kafkaintegration.rest</api.package>
- <api.title>Kafka Integration Application REST API</api.title>
- <api.description>
- APIs for subscribing to Events generated by ONOS
- </api.description>
<onos.app.category>Utility</onos.app.category>
<onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
- <onos.app.readme>Export ONOS events to a Northbound Kafka server</onos.app.readme>
<onos.app.requires>org.onosproject.incubator.protobuf</onos.app.requires>
</properties>
- <dependencies>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- </dependency>
+ <packaging>pom</packaging>
+ <description>
+ Kafka Integration Application.
+ This will export ONOS Events to Northbound Kafka Server.
+ </description>
+
+ <dependencies>
<dependency>
<groupId>org.onosproject</groupId>
@@ -65,168 +56,22 @@
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onos-incubator-protobuf</artifactId>
+ <artifactId>onos-app-kafka-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onlab-osgi</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-rest</artifactId>
+ <artifactId>onos-app-kafka-web</artifactId>
<version>${project.version}</version>
</dependency>
+ <!--Also need to update the app.xml and the features.xml -->
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+ <version>0.8.2.2_1</version>
</dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- <scope>test</scope>
- <classifier>tests</classifier>
- </dependency>
-
- <dependency>
- <groupId>javax.ws.rs</groupId>
- <artifactId>javax.ws.rs-api</artifactId>
- <version>2.0.1</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>3.0.0-beta-2</version>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
-
- <dependency>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-servlet</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-core-serializers</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.felix</groupId>
- <artifactId>org.apache.felix.scr.annotations</artifactId>
- <scope>provided</scope>
- </dependency>
</dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <extensions>true</extensions>
- <configuration>
- <instructions>
- <Bundle-SymbolicName>
- ${project.groupId}.${project.artifactId}
- </Bundle-SymbolicName>
- <_wab>src/main/webapp/</_wab>
- <Include-Resource>
- WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
- {maven-resources}
- </Include-Resource>
- <Import-Package>
- org.slf4j,
- org.osgi.framework,
- javax.ws.rs,
- javax.ws.rs.core,
- org.glassfish.jersey.servlet,
- com.fasterxml.jackson.databind,
- com.fasterxml.jackson.databind.node,
- com.fasterxml.jackson.core,
- org.onlab.packet.*,
- org.onosproject.*,
- org.onlab.util.*,
- com.google.common.*,
- com.google.protobuf.*
- </Import-Package>
- <Web-ContextPath>${web.context}</Web-ContextPath>
- </instructions>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-scr-plugin</artifactId>
- <executions>
- <execution>
- <id>generate-scr-srcdescriptor</id>
- <goals>
- <goal>scr</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <supportedProjectTypes>
- <supportedProjectType>bundle</supportedProjectType>
- <supportedProjectType>war</supportedProjectType>
- </supportedProjectTypes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>cfg</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>cfg</goal>
- </goals>
- </execution>
- <execution>
- <id>swagger</id>
- <goals>
- <goal>swagger</goal>
- </goals>
- </execution>
- <execution>
- <id>app</id>
- <phase>package</phase>
- <goals>
- <goal>app</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/core/pom.xml b/apps/kafka-integration/core/pom.xml
new file mode 100644
index 0000000..44df376
--- /dev/null
+++ b/apps/kafka-integration/core/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2016-present Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-kafka</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-app-kafka-core</artifactId>
+
+ <packaging>bundle</packaging>
+ <description>
+ Kafka Integration Application.
+ This module is exclusive of REST calls and is only for the implementation of Apache Kafka.
+ </description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-app-kafka-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <version>4.3.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+ <version>0.8.2.2_1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-protobuf</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.0.0-beta-2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-app-kafka-web</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-scr-srcdescriptor</id>
+ <goals>
+ <goal>scr</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <supportedProjectTypes>
+ <supportedProjectType>bundle</supportedProjectType>
+ <supportedProjectType>war</supportedProjectType>
+ </supportedProjectTypes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
similarity index 96%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
index 8f69a20..26b5060 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/ConversionFactory.java
@@ -14,13 +14,13 @@
*/
package org.onosproject.kafkaintegration.converter;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import java.util.HashMap;
import java.util.Map;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Returns the appropriate converter object based on the ONOS event type.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
similarity index 90%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
index ac04040..6b2ee24 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/DeviceEventConverter.java
@@ -14,6 +14,7 @@
*/
package org.onosproject.kafkaintegration.converter;
+import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.Event;
import org.onosproject.grpc.net.Device.DeviceCore;
import org.onosproject.grpc.net.Device.DeviceType;
@@ -25,13 +26,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.protobuf.GeneratedMessage;
-
/**
- * Converts ONOS Device event message to GPB format.
- *
+ * Converts ONOS Device event message to protobuf format.
*/
-class DeviceEventConverter implements EventConverter {
+public class DeviceEventConverter implements EventConverter {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -73,14 +71,14 @@
DeviceCore deviceCore =
DeviceCore.newBuilder()
.setChassisId(deviceEvent.subject().chassisId().id()
- .toString())
+ .toString())
.setDeviceId(deviceEvent.subject().id().toString())
.setHwVersion(deviceEvent.subject().hwVersion())
.setManufacturer(deviceEvent.subject().manufacturer())
.setSerialNumber(deviceEvent.subject().serialNumber())
.setSwVersion(deviceEvent.subject().swVersion())
.setType(DeviceType
- .valueOf(deviceEvent.subject().type().name()))
+ .valueOf(deviceEvent.subject().type().name()))
.build();
PortCore portCore = null;
@@ -89,10 +87,10 @@
PortCore.newBuilder()
.setIsEnabled(deviceEvent.port().isEnabled())
.setPortNumber(deviceEvent.port().number()
- .toString())
+ .toString())
.setPortSpeed(deviceEvent.port().portSpeed())
.setType(PortType
- .valueOf(deviceEvent.port().type().name()))
+ .valueOf(deviceEvent.port().type().name()))
.build();
notificationBuilder.setPort(portCore);
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
similarity index 83%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
index e15ebdc..c1e7739 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/EventConverter.java
@@ -20,7 +20,7 @@
/**
*
- * APIs for converting between ONOS event objects and GPB data objects.
+ * APIs for converting between ONOS event objects and protobuf data objects.
*
*/
public interface EventConverter {
@@ -30,7 +30,7 @@
* to Kafka.
*
* @param event ONOS Event object
- * @return converted data in GPB format.
+ * @return converted data in protobuf format.
*/
- public GeneratedMessage convertToProtoMessage(Event<?, ?> event);
+ GeneratedMessage convertToProtoMessage(Event<?, ?> event);
}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
similarity index 93%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
index a1db209..febe020 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/LinkEventConverter.java
@@ -28,10 +28,9 @@
import com.google.protobuf.GeneratedMessage;
/**
- * Converts for ONOS Link event message to GPB format.
- *
+ * Converts for ONOS Link event message to protobuf format.
*/
-class LinkEventConverter implements EventConverter {
+public class LinkEventConverter implements EventConverter {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -41,7 +40,7 @@
LinkEvent linkEvent = (LinkEvent) event;
if (!linkEventTypeSupported(linkEvent)) {
- log.error("Unsupported Onos Event {}. There is no matching"
+ log.error("Unsupported Onos Event {}. There is no matching "
+ "proto Event type", linkEvent.type().toString());
return null;
}
@@ -56,7 +55,6 @@
return true;
}
}
-
return false;
}
@@ -66,8 +64,7 @@
.setLink(LinkCore.newBuilder()
.setState(LinkState
.valueOf(linkEvent.subject().state().name()))
- .setType(LinkType
- .valueOf(linkEvent.subject().type().name()))
+ .setType(LinkType.valueOf(linkEvent.subject().type().name()))
.setDst(ConnectPoint.newBuilder()
.setDeviceId(linkEvent.subject().dst()
.deviceId().toString())
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/converter/package-info.java
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
new file mode 100644
index 0000000..f1d0c97
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventConversionManager.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.event.Event;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent;
+import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
+import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.converter.LinkEventConverter;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Implementation of Event Conversion Service.
+ *
+ */
+@Component(immediate = true)
+@Service
+public class EventConversionManager implements EventConversionService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private EventConverter deviceEventConverter;
+ private EventConverter linkEventConverter;
+
+ @Activate
+ protected void activate() {
+ deviceEventConverter = new DeviceEventConverter();
+ linkEventConverter = new LinkEventConverter();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public OnosEvent convertEvent(Event<?, ?> event) {
+ if (event instanceof DeviceEvent) {
+ return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event));
+ } else if (event instanceof LinkEvent) {
+ return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event));
+ } else {
+ throw new IllegalArgumentException("Unsupported event type");
+ }
+ }
+}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
similarity index 94%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
index e2e8e57..ebbac6b 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/EventExporterManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/EventSubscriptionManager.java
@@ -1,4 +1,4 @@
-/*
+/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,18 +15,7 @@
*/
package org.onosproject.kafkaintegration.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
-import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -35,7 +24,9 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.kafkaintegration.api.EventExporterService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
@@ -51,13 +42,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
/**
- * Implementation of Event Exporter Service.
+ * Implementation of Event Subscription Manager.
*
*/
@Component(immediate = true)
@Service
-public class EventExporterManager implements EventExporterService {
+public class EventSubscriptionManager implements EventSubscriptionService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -147,15 +147,12 @@
+ "registered to make this request.");
}
- if (!validGroupId(subscriber.subscriberGroupId(),
- subscriber.appName())) {
+ if (!validGroupId(subscriber.subscriberGroupId(), subscriber.appName())) {
throw new InvalidGroupIdException("Incorrect group id in the request");
}
OnosEventListener onosListener = getListener(subscriber.eventType());
- checkNotNull(onosListener,
- "No listener for the supported event type - {}",
- subscriber.eventType());
+ checkNotNull(onosListener, "No listener for the supported event type - {}", subscriber.eventType());
applyListenerAction(subscriber.eventType(), onosListener,
ListenerAction.START);
@@ -169,7 +166,7 @@
subscriptionList.add(subscriber);
subscriptions.put(subscriber.eventType(), subscriptionList);
- log.info("Subscription for {} event by {} successfull",
+ log.info("Subscription for {} event by {} successful",
subscriber.eventType(), subscriber.appName());
}
@@ -308,6 +305,12 @@
subscriber.eventType());
}
+ @Override
+ public List<EventSubscriber> getEventSubscribers(Type type) {
+ return subscriptions.getOrDefault(type, ImmutableList.of());
+
+ }
+
/**
* Checks if the subscriber has already subscribed to the requested event
* type.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
similarity index 96%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
index 47098fa..22705d5 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaCodecRegistrator.java
@@ -21,7 +21,6 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.codec.CodecService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.rest.SubscriberCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
similarity index 79%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
index d211458..104619a 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/Dispatcher.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaPublisherManager.java
@@ -16,6 +16,7 @@
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.kafkaintegration.api.ExportableEventListener;
+import org.onosproject.kafkaintegration.api.KafkaPublisherService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.slf4j.Logger;
@@ -27,17 +28,19 @@
* Dispatch ONOS Events to all interested Listeners.
*
*/
-public final class Dispatcher
- extends AbstractListenerManager<OnosEvent, ExportableEventListener> {
+
+public final class KafkaPublisherManager
+ extends AbstractListenerManager<OnosEvent, ExportableEventListener> implements KafkaPublisherService {
private final Logger log = LoggerFactory.getLogger(getClass());
// Exists to defeat instantiation
- private Dispatcher() {
+ private KafkaPublisherManager() {
}
+ //TODO: If possible, get rid of Singleton implementation.
private static class SingletonHolder {
- private static final Dispatcher INSTANCE = new Dispatcher();
+ private static final KafkaPublisherManager INSTANCE = new KafkaPublisherManager();
}
/**
@@ -45,16 +48,11 @@
*
* @return singleton object
*/
- public static Dispatcher getInstance() {
+ public static KafkaPublisherManager getInstance() {
return SingletonHolder.INSTANCE;
}
- /**
- * Publish the ONOS Event to all listeners.
- *
- * @param eventType the ONOS eventtype
- * @param message generated Protocol buffer message from ONOS event data
- */
+ @Override
public void publish(Type eventType, GeneratedMessage message) {
log.debug("Dispatching ONOS Event {}", eventType);
post(new OnosEvent(eventType, message));
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
similarity index 99%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
index 4b9035e..64af4e8 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/KafkaStorageManager.java
@@ -14,11 +14,6 @@
*/
package org.onosproject.kafkaintegration.impl;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -31,6 +26,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
@Component(immediate = true)
public class KafkaStorageManager implements KafkaEventStorageService {
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
similarity index 97%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
index 2876ca1..3489f6f 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/SubscriberCodec.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/SubscriberCodec.java
@@ -12,19 +12,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.kafkaintegration.rest;
+package org.onosproject.kafkaintegration.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.UUID;
-
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onosproject.codec.CodecContext;
import org.onosproject.codec.JsonCodec;
import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Codec for encoding/decoding a Subscriber object to/from JSON.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/impl/package-info.java
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java
new file mode 100644
index 0000000..6e7291b
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/EventMonitor.java
@@ -0,0 +1,242 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.kafkaintegration.api.EventConversionService;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Dictionary;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+
+/**
+ * Encapsulates the behavior of monitoring various ONOS events.
+ * */
+@Component(immediate = true)
+public class EventMonitor {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventSubscriptionService eventSubscriptionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventConversionService eventConversionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService componentConfigService;
+
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final LinkListener linkListener = new InternalLinkListener();
+
+ protected ExecutorService eventExecutor;
+
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final int RETRIES = 1;
+ private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
+ private static final int REQUEST_REQUIRED_ACKS = 1;
+ private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
+
+ @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
+ label = "Default host/post pair to establish initial connection to Kafka cluster.")
+ private String bootstrapServers = BOOTSTRAP_SERVERS;
+
+ @Property(name = "retries", intValue = RETRIES,
+ label = "Number of times the producer can retry to send after first failure")
+ private int retries = RETRIES;
+
+ @Property(name = "max.in.flight.requests.per.connection", intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+ label = "The maximum number of unacknowledged requests the client will send before blocking")
+ private int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+
+ @Property(name = "request.required.acks", intValue = 1,
+ label = "Producer will get an acknowledgement after the leader has replicated the data")
+ private int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+
+ @Property(name = "key.serializer", value = KEY_SERIALIZER,
+ label = "Serializer class for key that implements the Serializer interface.")
+ private String keySerializer = KEY_SERIALIZER;
+
+ @Property(name = "value.serializer", value = VALUE_SERIALIZER,
+ label = "Serializer class for value that implements the Serializer interface.")
+ private String valueSerializer = VALUE_SERIALIZER;
+
+ private Producer producer;
+
+ @Activate
+ protected void activate(ComponentContext context) {
+ componentConfigService.registerProperties(getClass());
+ eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
+ deviceService.addListener(deviceListener);
+ linkService.addListener(linkListener);
+ producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
+ requestRequiredAcks, keySerializer, valueSerializer);
+ producer.start();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ componentConfigService.unregisterProperties(getClass(), false);
+ deviceService.removeListener(deviceListener);
+ linkService.removeListener(linkListener);
+ producer.stop();
+ eventExecutor.shutdownNow();
+ eventExecutor = null;
+
+ log.info("Stopped");
+ }
+
+ @Modified
+ private void modified(ComponentContext context) {
+ if (context == null) {
+ bootstrapServers = BOOTSTRAP_SERVERS;
+ retries = RETRIES;
+ maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+ requestRequiredAcks = REQUEST_REQUIRED_ACKS;
+ keySerializer = KEY_SERIALIZER;
+ valueSerializer = VALUE_SERIALIZER;
+ return;
+ }
+ Dictionary properties = context.getProperties();
+
+ String newBootstrapServers = BOOTSTRAP_SERVERS;
+ int newRetries = RETRIES;
+ int newMaxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+ int newRequestRequiredAcks = REQUEST_REQUIRED_ACKS;
+ try {
+ String s = get(properties, "bootstrapServers");
+ newBootstrapServers = isNullOrEmpty(s)
+ ? bootstrapServers : s.trim();
+
+ s = get(properties, "retries");
+ newRetries = isNullOrEmpty(s)
+ ? retries : Integer.parseInt(s.trim());
+
+ s = get(properties, "maxInFlightRequestsPerConnection");
+ newMaxInFlightRequestsPerConnection = isNullOrEmpty(s)
+ ? maxInFlightRequestsPerConnection : Integer.parseInt(s.trim());
+
+ s = get(properties, "requestRequiredAcks");
+ newRequestRequiredAcks = isNullOrEmpty(s)
+ ? requestRequiredAcks : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException | ClassCastException e) {
+ return;
+ }
+
+ boolean modified = newBootstrapServers != bootstrapServers ||
+ newRetries != retries ||
+ newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection ||
+ newRequestRequiredAcks != requestRequiredAcks;
+
+ if (modified) {
+ bootstrapServers = newBootstrapServers;
+ retries = newRetries;
+ maxInFlightRequestsPerConnection = newMaxInFlightRequestsPerConnection;
+ requestRequiredAcks = newRequestRequiredAcks;
+ if (producer != null) {
+ producer.stop();
+ }
+ producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
+ requestRequiredAcks, keySerializer, valueSerializer);
+ producer.start();
+ log.info("Modified");
+ } else {
+ return;
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
+ eventExecutor.execute(() -> {
+ try {
+ String id = UUID.randomUUID().toString();
+ producer.send(new ProducerRecord<>(DEVICE.toString(),
+ id, event.subject().toString().getBytes())).get();
+ log.debug("Device event sent successfully.");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Exception thrown {}", e);
+ }
+ });
+ } else {
+ log.debug("No device listeners");
+ }
+ }
+ }
+
+ private class InternalLinkListener implements LinkListener {
+
+ @Override
+ public void event(LinkEvent event) {
+ if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
+ eventExecutor.execute(() -> {
+ try {
+ String id = UUID.randomUUID().toString();
+ producer.send(new ProducerRecord<>(LINK.toString(),
+ id, event.subject().toString().getBytes())).get();
+ log.debug("Link event sent successfully.");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Exception thrown {}", e);
+ }
+ });
+ } else {
+ log.debug("No link listeners");
+ }
+ }
+ }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java
new file mode 100644
index 0000000..bbdff81
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/Producer.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+/**
+ * Implementation of Kafka Producer.
+ */
+public class Producer {
+ private KafkaProducer<String, byte[]> kafkaProducer = null;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ Producer(String bootstrapServers, int retries, int maxInFlightRequestsPerConnection,
+ int requestRequiredAcks, String keySerializer, String valueSerializer) {
+
+ Properties prop = new Properties();
+ prop.put("bootstrap.servers", bootstrapServers);
+ prop.put("retries", retries);
+ prop.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
+ prop.put("request.required.acks", requestRequiredAcks);
+ prop.put("key.serializer", keySerializer);
+ prop.put("value.serializer", valueSerializer);
+
+ kafkaProducer = new KafkaProducer<>(prop);
+ }
+
+ public void start() {
+ log.info("Started");
+ }
+
+ public void stop() {
+ if (kafkaProducer != null) {
+ kafkaProducer.close();
+ kafkaProducer = null;
+ }
+
+ log.info("Stopped");
+ }
+
+ public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
+ return kafkaProducer.send(record);
+ }
+}
diff --git a/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
new file mode 100644
index 0000000..e222a64
--- /dev/null
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/kafka/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory Licensed under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * API implementation classes.
+ */
+package org.onosproject.kafkaintegration.kafka;
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
similarity index 87%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
index 22c4bf2..e2f23c0 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/DeviceEventsListener.java
@@ -14,29 +14,31 @@
*/
package org.onosproject.kafkaintegration.listener;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-
+import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.converter.ConversionFactory;
import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.protobuf.GeneratedMessage;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
/**
* Listens for ONOS Device events.
*
*/
-final class DeviceEventsListener implements OnosEventListener {
+public final class DeviceEventsListener implements OnosEventListener {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
private boolean listenerRunning = false;
private InnerListener listener = null;
-
// Exists to defeat instantiation
private DeviceEventsListener() {
}
@@ -70,14 +72,14 @@
@Override
public void event(DeviceEvent arg0) {
- // Convert the event to GPB format
+ // Convert the event to protobuf format
ConversionFactory conversionFactory =
ConversionFactory.getInstance();
EventConverter converter = conversionFactory.getConverter(DEVICE);
GeneratedMessage message = converter.convertToProtoMessage(arg0);
// Call Dispatcher and publish event
- Dispatcher.getInstance().publish(DEVICE, message);
+ KafkaPublisherManager.getInstance().publish(DEVICE, message);
}
}
@@ -91,4 +93,4 @@
}
}
-}
+}
\ No newline at end of file
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
similarity index 91%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
index 769d8b4..39ab078 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/LinkEventsListener.java
@@ -14,29 +14,27 @@
*/
package org.onosproject.kafkaintegration.listener;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
-
+import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.ListenerService;
-import org.onosproject.kafkaintegration.impl.Dispatcher;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.converter.ConversionFactory;
import org.onosproject.kafkaintegration.converter.EventConverter;
+import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
-import com.google.protobuf.GeneratedMessage;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Listens for ONOS Link Events.
*
*/
-final class LinkEventsListener implements OnosEventListener {
+public final class LinkEventsListener implements OnosEventListener {
private boolean listenerRunning = false;
private InnerListener listener = null;
-
// Exists to defeat instantiation
private LinkEventsListener() {
}
@@ -70,14 +68,14 @@
@Override
public void event(LinkEvent arg0) {
- // Convert the event to GPB format
+ // Convert the event to protobuf format
ConversionFactory conversionFactory =
ConversionFactory.getInstance();
EventConverter converter = conversionFactory.getConverter(LINK);
GeneratedMessage message = converter.convertToProtoMessage(arg0);
// Call Dispatcher and publish event
- Dispatcher.getInstance().publish(LINK, message);
+ KafkaPublisherManager.getInstance().publish(LINK, message);
}
}
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
similarity index 99%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
index 496ba2b..65b7cf1 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/ListenerFactory.java
@@ -14,13 +14,13 @@
*/
package org.onosproject.kafkaintegration.listener;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
-import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
+import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import java.util.HashMap;
import java.util.Map;
-import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
+import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Returns the appropriate listener object based on the ONOS event type.
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
similarity index 99%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
index 22cde21..ad14104 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
+++ b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/OnosEventListener.java
@@ -30,7 +30,6 @@
* @param service ONOS event listener for the specific event type
*/
void startListener(Type event, ListenerService<?, ?> service);
-
/**
* Stop the Listener for the specific ONOS event type.
*
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java b/apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
rename to apps/kafka-integration/core/src/main/java/org/onosproject/kafkaintegration/listener/package-info.java
diff --git a/apps/kafka-integration/pom.xml b/apps/kafka-integration/pom.xml
index 03754ef..287700d 100644
--- a/apps/kafka-integration/pom.xml
+++ b/apps/kafka-integration/pom.xml
@@ -31,8 +31,9 @@
<modules>
<module>api</module>
+ <module>core</module>
+ <module>web</module>
<module>app</module>
</modules>
-</project>
-
+</project>
\ No newline at end of file
diff --git a/apps/kafka-integration/web/pom.xml b/apps/kafka-integration/web/pom.xml
new file mode 100644
index 0000000..bcf768e
--- /dev/null
+++ b/apps/kafka-integration/web/pom.xml
@@ -0,0 +1,219 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2016-present Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-kafka</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-app-kafka-web</artifactId>
+
+ <properties>
+ <web.context>/onos/kafka</web.context>
+ <api.version>1.0.0</api.version>
+ <api.package>org.onosproject.kafkaintegration.rest</api.package>
+ <api.title>Kafka Integration Application REST API</api.title>
+ <api.description>
+ APIs for subscribing to Events generated by ONOS
+ </api.description>
+ </properties>
+
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-app-kafka-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-protobuf</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-osgi</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-rest</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ <version>2.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.0.0-beta-2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-SymbolicName>
+ ${project.groupId}.${project.artifactId}
+ </Bundle-SymbolicName>
+ <_wab>src/main/webapp/</_wab>
+ <Include-Resource>
+ WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
+ {maven-resources}
+ </Include-Resource>
+ <Import-Package>
+ org.slf4j,
+ org.osgi.framework,
+ javax.ws.rs,
+ javax.ws.rs.core,
+ org.glassfish.jersey.servlet,
+ com.fasterxml.jackson.databind,
+ com.fasterxml.jackson.databind.node,
+ com.fasterxml.jackson.core,
+ org.onlab.packet.*,
+ org.onosproject.*,
+ org.onlab.util.*,
+ com.google.common.*,
+ com.google.protobuf.*
+ </Import-Package>
+ <Web-ContextPath>${web.context}</Web-ContextPath>
+ </instructions>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-scr-srcdescriptor</id>
+ <goals>
+ <goal>scr</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <supportedProjectTypes>
+ <supportedProjectType>bundle</supportedProjectType>
+ <supportedProjectType>war</supportedProjectType>
+ </supportedProjectTypes>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>cfg</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>cfg</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>swagger</id>
+ <goals>
+ <goal>swagger</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>app</id>
+ <phase>package</phase>
+ <goals>
+ <goal>app</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java b/apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
similarity index 90%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
rename to apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
index 0456362..0a6e91d 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
+++ b/apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/EventExporterWebResource.java
@@ -14,11 +14,15 @@
*/
package org.onosproject.kafkaintegration.rest;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.kafkaintegration.api.EventSubscriptionService;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
+import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
+import org.onosproject.rest.AbstractWebResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
@@ -27,16 +31,11 @@
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
-import org.onosproject.kafkaintegration.api.EventExporterService;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
-import org.onosproject.rest.AbstractWebResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
/**
* Rest Interfaces for subscribing/unsubscribing to event notifications.
@@ -52,11 +51,12 @@
public static final String DEREGISTRATION_SUCCESSFUL =
"De-Registered Listener successfully";
public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
- "Event Registration successfull";
+ "Event Registration successful";
public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
"Event subscription unsuccessful";
public static final String EVENT_SUBSCRIPTION_REMOVED =
- "Event De-Registration successfull";
+ "Event De-Registration successful";
+
/**
* Registers a listener for ONOS Events.
*
@@ -71,12 +71,11 @@
@Path("register")
public Response registerKafkaListener(String appName) {
- EventExporterService service = get(EventExporterService.class);
+ EventSubscriptionService service = get(EventSubscriptionService.class);
EventSubscriberGroupId groupId = service.registerListener(appName);
log.info("Registered app {}", appName);
-
// TODO: Should also return Kafka server information.
// Will glue this in when we have the config and Kafka modules ready
return ok(groupId.getId().toString()).build();
@@ -92,7 +91,7 @@
@DELETE
@Path("unregister")
public Response removeKafkaListener(String appName) {
- EventExporterService service = get(EventExporterService.class);
+ EventSubscriptionService service = get(EventSubscriptionService.class);
service.unregisterListener(appName);
log.info("Unregistered app {}", appName);
@@ -112,11 +111,12 @@
@Path("subscribe")
public Response subscribe(InputStream input) {
- EventExporterService service = get(EventExporterService.class);
+ EventSubscriptionService service = get(EventSubscriptionService.class);
try {
EventSubscriber sub = parseSubscriptionData(input);
service.subscribe(sub);
+ // It will subscribe to all the topics. Not only the one that is sent by the consumer.
} catch (Exception e) {
log.error(e.getMessage());
return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
@@ -155,7 +155,7 @@
@Path("unsubscribe")
public Response unsubscribe(InputStream input) {
- EventExporterService service = get(EventExporterService.class);
+ EventSubscriptionService service = get(EventSubscriptionService.class);
try {
EventSubscriber sub = parseSubscriptionData(input);
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java b/apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
similarity index 100%
rename from apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
rename to apps/kafka-integration/web/src/main/java/org/onosproject/kafkaintegration/rest/package-info.java
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json b/apps/kafka-integration/web/src/main/resources/definitions/KafkaRegistration.json
similarity index 100%
rename from apps/kafka-integration/app/src/main/resources/definitions/KafkaRegistration.json
rename to apps/kafka-integration/web/src/main/resources/definitions/KafkaRegistration.json
diff --git a/apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json b/apps/kafka-integration/web/src/main/resources/definitions/KafkaSubscription.json
similarity index 100%
rename from apps/kafka-integration/app/src/main/resources/definitions/KafkaSubscription.json
rename to apps/kafka-integration/web/src/main/resources/definitions/KafkaSubscription.json
diff --git a/apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml b/apps/kafka-integration/web/src/main/webapp/WEB-INF/web.xml
similarity index 100%
rename from apps/kafka-integration/app/src/main/webapp/WEB-INF/web.xml
rename to apps/kafka-integration/web/src/main/webapp/WEB-INF/web.xml