blob: aed8ba8f131bc6e98ed9fc70aae017cef41d2f1d [file] [log] [blame]
Shravan Ambati5a11e172016-07-21 15:55:28 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Shravan Ambati5a11e172016-07-21 15:55:28 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.kafka;
17
Shravan Ambati5a11e172016-07-21 15:55:28 -070018import org.onosproject.cfg.ComponentConfigService;
19import org.onosproject.kafkaintegration.api.KafkaConfigService;
Shravan Ambatia4875d82017-01-09 13:06:51 -080020import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070021import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
22import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070023import org.osgi.service.component.annotations.Activate;
24import org.osgi.service.component.annotations.Component;
25import org.osgi.service.component.annotations.Deactivate;
26import org.osgi.service.component.annotations.Modified;
27import org.osgi.service.component.annotations.Property;
28import org.osgi.service.component.annotations.Reference;
29import org.osgi.service.component.annotations.ReferenceCardinality;
Shravan Ambati5a11e172016-07-21 15:55:28 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
Ray Milkeyd84f89b2018-08-17 14:54:17 -070033import java.util.Dictionary;
34
35import static com.google.common.base.Strings.isNullOrEmpty;
36import static org.onlab.util.Tools.get;
37
38@Component(immediate = true, service = KafkaConfigService.class)
Shravan Ambati5a11e172016-07-21 15:55:28 -070039public class KafkaConfigManager implements KafkaConfigService {
40
41 private final Logger log = LoggerFactory.getLogger(getClass());
42
Ray Milkeyd84f89b2018-08-17 14:54:17 -070043 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambati5a11e172016-07-21 15:55:28 -070044 protected ComponentConfigService componentConfigService;
45
Ray Milkeyd84f89b2018-08-17 14:54:17 -070046 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080047 protected KafkaPublisherAdminService kafkaPublisherAdminService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070048
49 public static final String BOOTSTRAP_SERVERS = "localhost:9092";
50 private String kafkaServerIp =
51 BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
52 private String kafkaServerPortNum =
53 BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
54 BOOTSTRAP_SERVERS.length());
55
56 private static final int RETRIES = 1;
57 private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
58 private static final int REQUEST_REQUIRED_ACKS = 1;
59 private static final String KEY_SERIALIZER =
60 "org.apache.kafka.common.serialization.StringSerializer";
61 private static final String VALUE_SERIALIZER =
62 "org.apache.kafka.common.serialization.ByteArraySerializer";
63
64 @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
65 label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
66 protected String bootstrapServers = BOOTSTRAP_SERVERS;
67
68 @Property(name = "retries", intValue = RETRIES,
69 label = "Number of times the producer can retry to send after first failure")
70 protected int retries = RETRIES;
71
72 @Property(name = "max.in.flight.requests.per.connection",
73 intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
74 label = "The maximum number of unacknowledged requests the client will send before blocking")
75 protected int maxInFlightRequestsPerConnection =
76 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
77
78 @Property(name = "request.required.acks", intValue = 1,
79 label = "Producer will get an acknowledgement after the leader has replicated the data")
80 protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
81
82 @Property(name = "key.serializer", value = KEY_SERIALIZER,
83 label = "Serializer class for key that implements the Serializer interface.")
84 protected String keySerializer = KEY_SERIALIZER;
85
86 @Property(name = "value.serializer", value = VALUE_SERIALIZER,
87 label = "Serializer class for value that implements the Serializer interface.")
88 protected String valueSerializer = VALUE_SERIALIZER;
89
90 @Activate
91 protected void activate(ComponentContext context) {
92 componentConfigService.registerProperties(getClass());
Shravan Ambatia4875d82017-01-09 13:06:51 -080093 kafkaPublisherAdminService.start(getConfigParams());
Shravan Ambati5a11e172016-07-21 15:55:28 -070094 log.info("Started");
95 }
96
97 @Deactivate
98 protected void deactivate() {
99 componentConfigService.unregisterProperties(getClass(), false);
Shravan Ambatia4875d82017-01-09 13:06:51 -0800100 kafkaPublisherAdminService.stop();
Shravan Ambati5a11e172016-07-21 15:55:28 -0700101 log.info("Stopped");
102 }
103
104 @Modified
105 private void modified(ComponentContext context) {
106 if (context == null) {
107 bootstrapServers = BOOTSTRAP_SERVERS;
108 retries = RETRIES;
109 maxInFlightRequestsPerConnection =
110 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
111 requestRequiredAcks = REQUEST_REQUIRED_ACKS;
112 keySerializer = KEY_SERIALIZER;
113 valueSerializer = VALUE_SERIALIZER;
114 return;
115 }
116
117 Dictionary<?, ?> properties = context.getProperties();
118
119 String newBootstrapServers;
120 int newRetries;
121 int newMaxInFlightRequestsPerConnection;
122 int newRequestRequiredAcks;
123 try {
124 String s = get(properties, "bootstrap.servers");
125 newBootstrapServers =
126 isNullOrEmpty(s) ? bootstrapServers : s.trim();
127
128 s = get(properties, "retries");
129 newRetries =
130 isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
131
132 s = get(properties, "max.in.flight.requests.per.connection");
133 newMaxInFlightRequestsPerConnection =
134 isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
135 : Integer.parseInt(s.trim());
136
137 s = get(properties, "request.required.acks");
138 newRequestRequiredAcks =
139 isNullOrEmpty(s) ? requestRequiredAcks
140 : Integer.parseInt(s.trim());
141
142 } catch (NumberFormatException | ClassCastException e) {
143 return;
144 }
145
146 if (configModified(newBootstrapServers, newRetries,
147 newMaxInFlightRequestsPerConnection,
148 newRequestRequiredAcks)) {
149 bootstrapServers = newBootstrapServers;
150 kafkaServerIp = bootstrapServers
151 .substring(0, bootstrapServers.indexOf(":"));
152 kafkaServerPortNum = bootstrapServers
153 .substring(bootstrapServers.indexOf(":") + 1,
154 bootstrapServers.length());
155
156 retries = newRetries;
157
158 maxInFlightRequestsPerConnection =
159 newMaxInFlightRequestsPerConnection;
160
161 requestRequiredAcks = newRequestRequiredAcks;
162
Shravan Ambatia4875d82017-01-09 13:06:51 -0800163 kafkaPublisherAdminService.restart(KafkaServerConfig.builder()
Shravan Ambati5a11e172016-07-21 15:55:28 -0700164 .ipAddress(kafkaServerIp).port(kafkaServerPortNum)
165 .numOfRetries(retries)
166 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
167 .acksRequired(requestRequiredAcks)
168 .keySerializer(keySerializer)
169 .valueSerializer(valueSerializer).build());
170
171 log.info("Kafka Server Config has been Modified - "
172 + "bootstrapServers {}, retries {}, "
173 + "maxInFlightRequestsPerConnection {}, "
174 + "requestRequiredAcks {}", bootstrapServers, retries,
175 maxInFlightRequestsPerConnection, requestRequiredAcks);
176 } else {
177 return;
178 }
179 }
180
181 private boolean configModified(String newBootstrapServers, int newRetries,
182 int newMaxInFlightRequestsPerConnection,
183 int newRequestRequiredAcks) {
184
185 return !newBootstrapServers.equals(bootstrapServers)
186 || newRetries != retries
187 || newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection
188 || newRequestRequiredAcks != requestRequiredAcks;
189
190 }
191
192 @Override
193 public KafkaServerConfig getConfigParams() {
194 String ipAddr =
195 bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
196 String port =
197 bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
198 bootstrapServers.length());
199
200 return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
201 .numOfRetries(retries)
202 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
203 .acksRequired(requestRequiredAcks).keySerializer(keySerializer)
204 .valueSerializer(valueSerializer).build();
205
206 }
207
208}