Refactored Kafka Application to simplify dependencies
1. Fixed a Bug in KafkaProducer. Without this fix the App will not send data in GPB format.
2. Added two new services - KafkaProducerService and KafkaConfigService.
3. Fixed a TODO in the register API to return Kafka server information.
4. Removed the use of LeadershipService and ClusterService, since we are not ready for clustering yet.
Change-Id: If20ef5238bb4629af0c6769129494eb44abf1d3c
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
index 07b03de..4c6d50a 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/EventSubscriptionService.java
@@ -14,16 +14,16 @@
*/
package org.onosproject.kafkaintegration.api;
+import java.util.List;
+
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
-import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
+import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
import com.google.common.annotations.Beta;
-import java.util.List;
-
/**
* APIs for subscribing to Onos Event Messages.
*/
@@ -34,9 +34,9 @@
* Registers the external application to receive events generated in ONOS.
*
* @param appName Application Name
- * @return unique consumer group identifier
+ * @return Registration Response DTO.
*/
- EventSubscriberGroupId registerListener(String appName);
+ RegistrationResponse registerListener(String appName);
/**
* Removes the Registered Listener.
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaConfigService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaConfigService.java
new file mode 100644
index 0000000..f0d0f6c
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaConfigService.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api;
+
+import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+
+public interface KafkaConfigService {
+
+ /**
+ * Returns the Kafka Server Configuration Information.
+ *
+ * @return KafkaServerConfig DTO Object.
+ */
+ KafkaServerConfig getConfigParams();
+
+}
+
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaProducerService.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaProducerService.java
new file mode 100644
index 0000000..d29cd47
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/KafkaProducerService.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api;
+
+import java.util.concurrent.Future;
+
+import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+/**
+ * APIs for controlling the Kafka Producer.
+ *
+ */
+public interface KafkaProducerService {
+
+ /**
+ * Starts the Kafka Producer.
+ *
+ * @param config the Kafka Server Config
+ */
+ void start(KafkaServerConfig config);
+
+ /**
+ * Stops the Kafka Producer.
+ *
+ */
+ void stop();
+
+ /**
+ * Restarts the Kafka Producer.
+ *
+ * @param config the Kafka Server Config
+ */
+ void restart(KafkaServerConfig config);
+
+ /**
+ * Sends message to Kafka Server.
+ *
+ * @param record a message to be sent
+ * @return metadata for a record that as been acknowledged
+ */
+ public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record);
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java
index bf1fbf8..04f4e0b 100644
--- a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/DefaultEventSubscriber.java
@@ -84,6 +84,7 @@
.addValue(subscriberGroupId.toString())
.add("eventType", eventType).toString();
}
+
/**
* To create an instance of the builder.
*
@@ -92,6 +93,7 @@
public static Builder builder() {
return new Builder();
}
+
/**
* Builder class for Event subscriber.
*/
@@ -107,8 +109,7 @@
}
@Override
- public Builder setSubscriberGroupId(EventSubscriberGroupId
- subscriberGroupId) {
+ public Builder setSubscriberGroupId(EventSubscriberGroupId subscriberGroupId) {
this.subscriberGroupId = subscriberGroupId;
return this;
}
@@ -122,13 +123,11 @@
@Override
public EventSubscriber build() {
checkNotNull(appName, "App name cannot be null");
- checkNotNull(subscriberGroupId, "Subscriber group ID cannot " +
- "be " +
- "null");
+ checkNotNull(subscriberGroupId,
+ "Subscriber group ID cannot " + "be " + "null");
checkNotNull(eventType, "Event type cannot be null");
- return new DefaultEventSubscriber(appName,
- subscriberGroupId,
+ return new DefaultEventSubscriber(appName, subscriberGroupId,
eventType);
}
}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/KafkaServerConfig.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/KafkaServerConfig.java
new file mode 100644
index 0000000..25beda0
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/KafkaServerConfig.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api.dto;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * DTO to hold Kafka Server Configuration information.
+ *
+ */
+public final class KafkaServerConfig {
+
+ private final String ipAddress;
+
+ private final String port;
+
+ private final int numOfRetries;
+
+ private final int maxInFlightRequestsPerConnection;
+
+ private final int acksRequired;
+
+ private final String keySerializer;
+
+ private final String valueSerializer;
+
+ private KafkaServerConfig(String ipAddress, String port, int numOfRetries,
+ int maxInFlightRequestsPerConnection,
+ int requestRequiredAcks, String keySerializer,
+ String valueSerializer) {
+
+ this.ipAddress = checkNotNull(ipAddress, "Ip Address Cannot be null");
+ this.port = checkNotNull(port, "Port Number cannot be null");
+ this.numOfRetries = numOfRetries;
+ this.maxInFlightRequestsPerConnection =
+ maxInFlightRequestsPerConnection;
+ this.acksRequired = requestRequiredAcks;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ }
+
+ public final String getIpAddress() {
+ return ipAddress;
+ }
+
+ public final String getPort() {
+ return port;
+ }
+
+ public final int getNumOfRetries() {
+ return numOfRetries;
+ }
+
+ public final int getMaxInFlightRequestsPerConnection() {
+ return maxInFlightRequestsPerConnection;
+ }
+
+ public final int getAcksRequired() {
+ return acksRequired;
+ }
+
+ public final String getKeySerializer() {
+ return keySerializer;
+ }
+
+ public final String getValueSerializer() {
+ return valueSerializer;
+ }
+
+ /**
+ * To create an instance of the builder.
+ *
+ * @return instance of builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder class for KafkaServerConfig.
+ */
+ public static final class Builder {
+ private String ipAddress;
+
+ private String port;
+
+ private int numOfRetries;
+
+ private int maxInFlightRequestsPerConnection;
+
+ private int acksRequired;
+
+ private String keySerializer;
+
+ private String valueSerializer;
+
+ public Builder ipAddress(String ipAddress) {
+ this.ipAddress = ipAddress;
+ return this;
+ }
+
+ public Builder port(String port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder numOfRetries(int numOfRetries) {
+ this.numOfRetries = numOfRetries;
+ return this;
+ }
+
+ public Builder maxInFlightRequestsPerConnection(int maxInFlightRequestsPerConnection) {
+ this.maxInFlightRequestsPerConnection =
+ maxInFlightRequestsPerConnection;
+ return this;
+ }
+
+ public Builder acksRequired(int acksRequired) {
+ this.acksRequired = acksRequired;
+ return this;
+ }
+
+ public Builder keySerializer(String keySerializer) {
+ this.keySerializer = keySerializer;
+ return this;
+ }
+
+ public Builder valueSerializer(String valueSerializer) {
+ this.valueSerializer = valueSerializer;
+ return this;
+ }
+
+ public KafkaServerConfig build() {
+ checkNotNull(ipAddress, "App name cannot be null");
+ checkNotNull(port, "Subscriber group ID cannot " + "be " + "null");
+
+ return new KafkaServerConfig(ipAddress, port, numOfRetries,
+ maxInFlightRequestsPerConnection,
+ acksRequired, keySerializer,
+ valueSerializer);
+ }
+ }
+}
diff --git a/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/RegistrationResponse.java b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/RegistrationResponse.java
new file mode 100644
index 0000000..e0a02b7
--- /dev/null
+++ b/apps/kafka-integration/api/src/main/java/org/onosproject/kafkaintegration/api/dto/RegistrationResponse.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kafkaintegration.api.dto;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.util.Objects;
+
+/**
+ * DTO to hold Registration Response for requests from external apps.
+ */
+public final class RegistrationResponse {
+
+ private EventSubscriberGroupId groupId;
+
+ private String ipAddress;
+
+ private String port;
+
+ public RegistrationResponse(EventSubscriberGroupId groupId,
+ String ipAddress, String port) {
+ this.groupId = groupId;
+ this.ipAddress = ipAddress;
+ this.port = port;
+ }
+
+ public final EventSubscriberGroupId getGroupId() {
+ return groupId;
+ }
+
+ public final String getIpAddress() {
+ return ipAddress;
+ }
+
+ public final String getPort() {
+ return port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof RegistrationResponse) {
+ RegistrationResponse sub = (RegistrationResponse) o;
+ if (sub.groupId.equals(groupId) && sub.ipAddress.equals(ipAddress)
+ && sub.port.equals(port)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, ipAddress, port);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("subscriberGroupId", groupId)
+ .add("ipAddress", ipAddress).add("port", port).toString();
+ }
+}