Fixing Kafka integration app to follow the new OSGi property pattern.

Property names had to be changed to follow Java field name conventions.

Change-Id: I5abdb259ca8ecff3bd50430aa749a2790022a372
diff --git a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
index aed8ba8..4b90b8e 100644
--- a/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
+++ b/apps/kafka-integration/app/src/main/java/org/onosproject/kafkaintegration/kafka/KafkaConfigManager.java
@@ -24,7 +24,6 @@
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Property;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
@@ -34,8 +33,18 @@
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
+import static org.onosproject.kafkaintegration.kafka.OsgiPropertyConstants.*;
 
-@Component(immediate = true, service = KafkaConfigService.class)
+@Component(immediate = true, service = KafkaConfigService.class,
+        property = {
+                BOOTSTRAP_SERVERS + "=" + BOOTSTRAP_SERVERS_DEFAULT,
+                RETRIES + ":Integer=" + RETRIES_DEFAULT,
+                MAX_IN_FLIGHT + ":Integer=" + MAX_IN_FLIGHT_DEFAULT,
+                REQUIRED_ACKS + ":Integer=" + REQUIRED_ACKS_DEFAULT,
+                KEY_SERIALIZER + "=" + KEY_SERIALIZER_DEFAULT,
+                VALUE_SERIALIZER + "=" + VALUE_SERIALIZER_DEFAULT,
+        }
+)
 public class KafkaConfigManager implements KafkaConfigService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
@@ -46,45 +55,22 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected KafkaPublisherAdminService kafkaPublisherAdminService;
 
-    public static final String BOOTSTRAP_SERVERS = "localhost:9092";
-    private String kafkaServerIp =
-            BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
-    private String kafkaServerPortNum =
-            BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
-                                        BOOTSTRAP_SERVERS.length());
+    /** Default IP/Port pair to establish initial connection to Kafka cluster. */
+    protected String bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
 
-    private static final int RETRIES = 1;
-    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
-    private static final int REQUEST_REQUIRED_ACKS = 1;
-    private static final String KEY_SERIALIZER =
-            "org.apache.kafka.common.serialization.StringSerializer";
-    private static final String VALUE_SERIALIZER =
-            "org.apache.kafka.common.serialization.ByteArraySerializer";
+    /** Number of times the producer can retry to send after first failure. */
+    protected int retries = RETRIES_DEFAULT;
 
-    @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
-            label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
-    protected String bootstrapServers = BOOTSTRAP_SERVERS;
+    /** The maximum number of unacknowledged requests the client will send before blocking. */
+    protected int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
 
-    @Property(name = "retries", intValue = RETRIES,
-            label = "Number of times the producer can retry to send after first failure")
-    protected int retries = RETRIES;
+    /** Producer will get an acknowledgement after the leader has replicated the data. */
+    protected int requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
 
-    @Property(name = "max.in.flight.requests.per.connection",
-            intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
-            label = "The maximum number of unacknowledged requests the client will send before blocking")
-    protected int maxInFlightRequestsPerConnection =
-            MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
+    /** Serializer class for key that implements the Serializer interface. */
+    protected String keySerializer = KEY_SERIALIZER_DEFAULT;
 
-    @Property(name = "request.required.acks", intValue = 1,
-            label = "Producer will get an acknowledgement after the leader has replicated the data")
-    protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
-
-    @Property(name = "key.serializer", value = KEY_SERIALIZER,
-            label = "Serializer class for key that implements the Serializer interface.")
-    protected String keySerializer = KEY_SERIALIZER;
-
-    @Property(name = "value.serializer", value = VALUE_SERIALIZER,
-            label = "Serializer class for value that implements the Serializer interface.")
+    /** Serializer class for value that implements the Serializer interface. */
     protected String valueSerializer = VALUE_SERIALIZER;
 
     @Activate
@@ -104,13 +90,12 @@
     @Modified
     private void modified(ComponentContext context) {
         if (context == null) {
-            bootstrapServers = BOOTSTRAP_SERVERS;
-            retries = RETRIES;
-            maxInFlightRequestsPerConnection =
-                    MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
-            requestRequiredAcks = REQUEST_REQUIRED_ACKS;
-            keySerializer = KEY_SERIALIZER;
-            valueSerializer = VALUE_SERIALIZER;
+            bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
+            retries = RETRIES_DEFAULT;
+            maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
+            requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
+            keySerializer = KEY_SERIALIZER_DEFAULT;
+            valueSerializer = VALUE_SERIALIZER_DEFAULT;
             return;
         }
 
@@ -121,20 +106,20 @@
         int newMaxInFlightRequestsPerConnection;
         int newRequestRequiredAcks;
         try {
-            String s = get(properties, "bootstrap.servers");
+            String s = get(properties, BOOTSTRAP_SERVERS);
             newBootstrapServers =
                     isNullOrEmpty(s) ? bootstrapServers : s.trim();
 
-            s = get(properties, "retries");
+            s = get(properties, RETRIES);
             newRetries =
                     isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
 
-            s = get(properties, "max.in.flight.requests.per.connection");
+            s = get(properties, MAX_IN_FLIGHT);
             newMaxInFlightRequestsPerConnection =
                     isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
                                      : Integer.parseInt(s.trim());
 
-            s = get(properties, "request.required.acks");
+            s = get(properties, REQUIRED_ACKS);
             newRequestRequiredAcks =
                     isNullOrEmpty(s) ? requestRequiredAcks
                                      : Integer.parseInt(s.trim());
@@ -147,11 +132,10 @@
                            newMaxInFlightRequestsPerConnection,
                            newRequestRequiredAcks)) {
             bootstrapServers = newBootstrapServers;
-            kafkaServerIp = bootstrapServers
+            String kafkaServerIp = bootstrapServers
                     .substring(0, bootstrapServers.indexOf(":"));
-            kafkaServerPortNum = bootstrapServers
-                    .substring(bootstrapServers.indexOf(":") + 1,
-                               bootstrapServers.length());
+            String kafkaServerPortNum = bootstrapServers
+                    .substring(bootstrapServers.indexOf(":") + 1);
 
             retries = newRetries;
 
@@ -191,11 +175,8 @@
 
     @Override
     public KafkaServerConfig getConfigParams() {
-        String ipAddr =
-                bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
-        String port =
-                bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
-                                           bootstrapServers.length());
+        String ipAddr = bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
+        String port = bootstrapServers.substring(bootstrapServers.indexOf(":") + 1);
 
         return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
                 .numOfRetries(retries)