Fixing Kafka integration app to follow the new OSGi property pattern.
Property names had to be changed to follow Java field name conventions.
Change-Id: I5abdb259ca8ecff3bd50430aa749a2790022a372
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
index aed8ba8..4b90b8e 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
@@ -24,7 +24,6 @@
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Property;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
@@ -34,8 +33,18 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
+import static org.onosproject.kafkaintegration.kafka.OsgiPropertyConstants.*;
-@Component(immediate = true, service = KafkaConfigService.class)
+@Component(immediate = true, service = KafkaConfigService.class,
+ property = {
+ BOOTSTRAP_SERVERS + "=" + BOOTSTRAP_SERVERS_DEFAULT,
+ RETRIES + ":Integer=" + RETRIES_DEFAULT,
+ MAX_IN_FLIGHT + ":Integer=" + MAX_IN_FLIGHT_DEFAULT,
+ REQUIRED_ACKS + ":Integer=" + REQUIRED_ACKS_DEFAULT,
+ KEY_SERIALIZER + "=" + KEY_SERIALIZER_DEFAULT,
+ VALUE_SERIALIZER + "=" + VALUE_SERIALIZER_DEFAULT,
+ }
+)
public class KafkaConfigManager implements KafkaConfigService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -46,45 +55,22 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected KafkaPublisherAdminService kafkaPublisherAdminService;
- public static final String BOOTSTRAP_SERVERS = "localhost:9092";
- private String kafkaServerIp =
- BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
- private String kafkaServerPortNum =
- BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
- BOOTSTRAP_SERVERS.length());
+ /** Default IP/Port pair to establish initial connection to Kafka cluster. */
+ protected String bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
- private static final int RETRIES = 1;
- private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
- private static final int REQUEST_REQUIRED_ACKS = 1;
- private static final String KEY_SERIALIZER =
- "org.apache.kafka.common.serialization.StringSerializer";
- private static final String VALUE_SERIALIZER =
- "org.apache.kafka.common.serialization.ByteArraySerializer";
+ /** Number of times the producer can retry to send after first failure. */
+ protected int retries = RETRIES_DEFAULT;
- @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
- label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
- protected String bootstrapServers = BOOTSTRAP_SERVERS;
+ /** The maximum number of unacknowledged requests the client will send before blocking. */
+ protected int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
- @Property(name = "retries", intValue = RETRIES,
- label = "Number of times the producer can retry to send after first failure")
- protected int retries = RETRIES;
+ /** Producer will get an acknowledgement after the leader has replicated the data. */
+ protected int requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
- @Property(name = "max.in.flight.requests.per.connection",
- intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
- label = "The maximum number of unacknowledged requests the client will send before blocking")
- protected int maxInFlightRequestsPerConnection =
- MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+ /** Serializer class for key that implements the Serializer interface. */
+ protected String keySerializer = KEY_SERIALIZER_DEFAULT;
- @Property(name = "request.required.acks", intValue = 1,
- label = "Producer will get an acknowledgement after the leader has replicated the data")
- protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
-
- @Property(name = "key.serializer", value = KEY_SERIALIZER,
- label = "Serializer class for key that implements the Serializer interface.")
- protected String keySerializer = KEY_SERIALIZER;
-
- @Property(name = "value.serializer", value = VALUE_SERIALIZER,
- label = "Serializer class for value that implements the Serializer interface.")
+ /** Serializer class for value that implements the Serializer interface. */
protected String valueSerializer = VALUE_SERIALIZER;
@Activate
@@ -104,13 +90,12 @@
@Modified
private void modified(ComponentContext context) {
if (context == null) {
- bootstrapServers = BOOTSTRAP_SERVERS;
- retries = RETRIES;
- maxInFlightRequestsPerConnection =
- MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
- requestRequiredAcks = REQUEST_REQUIRED_ACKS;
- keySerializer = KEY_SERIALIZER;
- valueSerializer = VALUE_SERIALIZER;
+ bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
+ retries = RETRIES_DEFAULT;
+ maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
+ requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
+ keySerializer = KEY_SERIALIZER_DEFAULT;
+ valueSerializer = VALUE_SERIALIZER_DEFAULT;
return;
}
@@ -121,20 +106,20 @@
int newMaxInFlightRequestsPerConnection;
int newRequestRequiredAcks;
try {
- String s = get(properties, "bootstrap.servers");
+ String s = get(properties, BOOTSTRAP_SERVERS);
newBootstrapServers =
isNullOrEmpty(s) ? bootstrapServers : s.trim();
- s = get(properties, "retries");
+ s = get(properties, RETRIES);
newRetries =
isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
- s = get(properties, "max.in.flight.requests.per.connection");
+ s = get(properties, MAX_IN_FLIGHT);
newMaxInFlightRequestsPerConnection =
isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
: Integer.parseInt(s.trim());
- s = get(properties, "request.required.acks");
+ s = get(properties, REQUIRED_ACKS);
newRequestRequiredAcks =
isNullOrEmpty(s) ? requestRequiredAcks
: Integer.parseInt(s.trim());
@@ -147,11 +132,10 @@
newMaxInFlightRequestsPerConnection,
newRequestRequiredAcks)) {
bootstrapServers = newBootstrapServers;
- kafkaServerIp = bootstrapServers
+ String kafkaServerIp = bootstrapServers
.substring(0, bootstrapServers.indexOf(":"));
- kafkaServerPortNum = bootstrapServers
- .substring(bootstrapServers.indexOf(":") + 1,
- bootstrapServers.length());
+ String kafkaServerPortNum = bootstrapServers
+ .substring(bootstrapServers.indexOf(":") + 1);
retries = newRetries;
@@ -191,11 +175,8 @@
@Override
public KafkaServerConfig getConfigParams() {
- String ipAddr =
- bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
- String port =
- bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
- bootstrapServers.length());
+ String ipAddr = bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
+ String port = bootstrapServers.substring(bootstrapServers.indexOf(":") + 1);
return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
.numOfRetries(retries)