Fixing Kafka integration app to follow the new OSGi property pattern.
Property names had to be changed to follow Java field name conventions.
Change-Id: I5abdb259ca8ecff3bd50430aa749a2790022a372
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
index aed8ba8..4b90b8e 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
@@ -24,7 +24,6 @@
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Property;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
@@ -34,8 +33,18 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
+import static org.onosproject.kafkaintegration.kafka.OsgiPropertyConstants.*;
-@Component(immediate = true, service = KafkaConfigService.class)
+@Component(immediate = true, service = KafkaConfigService.class,
+ property = {
+ BOOTSTRAP_SERVERS + "=" + BOOTSTRAP_SERVERS_DEFAULT,
+ RETRIES + ":Integer=" + RETRIES_DEFAULT,
+ MAX_IN_FLIGHT + ":Integer=" + MAX_IN_FLIGHT_DEFAULT,
+ REQUIRED_ACKS + ":Integer=" + REQUIRED_ACKS_DEFAULT,
+ KEY_SERIALIZER + "=" + KEY_SERIALIZER_DEFAULT,
+ VALUE_SERIALIZER + "=" + VALUE_SERIALIZER_DEFAULT,
+ }
+)
public class KafkaConfigManager implements KafkaConfigService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -46,45 +55,22 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected KafkaPublisherAdminService kafkaPublisherAdminService;
- public static final String BOOTSTRAP_SERVERS = "localhost:9092";
- private String kafkaServerIp =
- BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
- private String kafkaServerPortNum =
- BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
- BOOTSTRAP_SERVERS.length());
+ /** Default IP/Port pair to establish initial connection to Kafka cluster. */
+ protected String bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
- private static final int RETRIES = 1;
- private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
- private static final int REQUEST_REQUIRED_ACKS = 1;
- private static final String KEY_SERIALIZER =
- "org.apache.kafka.common.serialization.StringSerializer";
- private static final String VALUE_SERIALIZER =
- "org.apache.kafka.common.serialization.ByteArraySerializer";
+ /** Number of times the producer can retry to send after first failure. */
+ protected int retries = RETRIES_DEFAULT;
- @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
- label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
- protected String bootstrapServers = BOOTSTRAP_SERVERS;
+ /** The maximum number of unacknowledged requests the client will send before blocking. */
+ protected int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
- @Property(name = "retries", intValue = RETRIES,
- label = "Number of times the producer can retry to send after first failure")
- protected int retries = RETRIES;
+ /** Producer will get an acknowledgement after the leader has replicated the data. */
+ protected int requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
- @Property(name = "max.in.flight.requests.per.connection",
- intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
- label = "The maximum number of unacknowledged requests the client will send before blocking")
- protected int maxInFlightRequestsPerConnection =
- MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+ /** Serializer class for key that implements the Serializer interface. */
+ protected String keySerializer = KEY_SERIALIZER_DEFAULT;
- @Property(name = "request.required.acks", intValue = 1,
- label = "Producer will get an acknowledgement after the leader has replicated the data")
- protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
-
- @Property(name = "key.serializer", value = KEY_SERIALIZER,
- label = "Serializer class for key that implements the Serializer interface.")
- protected String keySerializer = KEY_SERIALIZER;
-
- @Property(name = "value.serializer", value = VALUE_SERIALIZER,
- label = "Serializer class for value that implements the Serializer interface.")
+ /** Serializer class for value that implements the Serializer interface. */
protected String valueSerializer = VALUE_SERIALIZER;
@Activate
@@ -104,13 +90,12 @@
@Modified
private void modified(ComponentContext context) {
if (context == null) {
- bootstrapServers = BOOTSTRAP_SERVERS;
- retries = RETRIES;
- maxInFlightRequestsPerConnection =
- MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
- requestRequiredAcks = REQUEST_REQUIRED_ACKS;
- keySerializer = KEY_SERIALIZER;
- valueSerializer = VALUE_SERIALIZER;
+ bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
+ retries = RETRIES_DEFAULT;
+ maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
+ requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
+ keySerializer = KEY_SERIALIZER_DEFAULT;
+ valueSerializer = VALUE_SERIALIZER_DEFAULT;
return;
}
@@ -121,20 +106,20 @@
int newMaxInFlightRequestsPerConnection;
int newRequestRequiredAcks;
try {
- String s = get(properties, "bootstrap.servers");
+ String s = get(properties, BOOTSTRAP_SERVERS);
newBootstrapServers =
isNullOrEmpty(s) ? bootstrapServers : s.trim();
- s = get(properties, "retries");
+ s = get(properties, RETRIES);
newRetries =
isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
- s = get(properties, "max.in.flight.requests.per.connection");
+ s = get(properties, MAX_IN_FLIGHT);
newMaxInFlightRequestsPerConnection =
isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
: Integer.parseInt(s.trim());
- s = get(properties, "request.required.acks");
+ s = get(properties, REQUIRED_ACKS);
newRequestRequiredAcks =
isNullOrEmpty(s) ? requestRequiredAcks
: Integer.parseInt(s.trim());
@@ -147,11 +132,10 @@
newMaxInFlightRequestsPerConnection,
newRequestRequiredAcks)) {
bootstrapServers = newBootstrapServers;
- kafkaServerIp = bootstrapServers
+ String kafkaServerIp = bootstrapServers
.substring(0, bootstrapServers.indexOf(":"));
- kafkaServerPortNum = bootstrapServers
- .substring(bootstrapServers.indexOf(":") + 1,
- bootstrapServers.length());
+ String kafkaServerPortNum = bootstrapServers
+ .substring(bootstrapServers.indexOf(":") + 1);
retries = newRetries;
@@ -191,11 +175,8 @@
@Override
public KafkaServerConfig getConfigParams() {
- String ipAddr =
- bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
- String port =
- bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
- bootstrapServers.length());
+ String ipAddr = bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
+ String port = bootstrapServers.substring(bootstrapServers.indexOf(":") + 1);
return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
.numOfRetries(retries)
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/OsgiPropertyConstants.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/OsgiPropertyConstants.java
new file mode 100644
index 0000000..5e207e3
--- /dev/null
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/OsgiPropertyConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.kafkaintegration.kafka;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+ private OsgiPropertyConstants() {
+ }
+
+ static final String BOOTSTRAP_SERVERS = "bootstrapServers";
+ static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+
+ static final String RETRIES = "retries";
+ static final int RETRIES_DEFAULT = 1;
+
+ static final String MAX_IN_FLIGHT = "maxInFlightRequestsPerConnection";
+ static final int MAX_IN_FLIGHT_DEFAULT = 5;
+
+ static final String REQUIRED_ACKS = "requestRequiredAcks";
+ static final int REQUIRED_ACKS_DEFAULT = 1;
+
+ static final String KEY_SERIALIZER = "keySerializer";
+ static final String KEY_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization.StringSerializer";
+
+ static final String VALUE_SERIALIZER = "valueSerializer";
+ static final String VALUE_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization.ByteArraySerializer";
+
+}