blob: 79d7baff96840fb021e8608104539b398ce16b76 [file] [log] [blame]
Shravan Ambati5a11e172016-07-21 15:55:28 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Shravan Ambati5a11e172016-07-21 15:55:28 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.kafka;
17
18import static com.google.common.base.Strings.isNullOrEmpty;
19import static org.onlab.util.Tools.get;
20
21import java.util.Dictionary;
22
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onosproject.cfg.ComponentConfigService;
32import org.onosproject.kafkaintegration.api.KafkaConfigService;
Shravan Ambatia4875d82017-01-09 13:06:51 -080033import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070034import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
35import org.osgi.service.component.ComponentContext;
36import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
39@Component(immediate = true)
40@Service
41public class KafkaConfigManager implements KafkaConfigService {
42
43 private final Logger log = LoggerFactory.getLogger(getClass());
44
45 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 protected ComponentConfigService componentConfigService;
47
48 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080049 protected KafkaPublisherAdminService kafkaPublisherAdminService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070050
51 public static final String BOOTSTRAP_SERVERS = "localhost:9092";
52 private String kafkaServerIp =
53 BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
54 private String kafkaServerPortNum =
55 BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
56 BOOTSTRAP_SERVERS.length());
57
58 private static final int RETRIES = 1;
59 private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
60 private static final int REQUEST_REQUIRED_ACKS = 1;
61 private static final String KEY_SERIALIZER =
62 "org.apache.kafka.common.serialization.StringSerializer";
63 private static final String VALUE_SERIALIZER =
64 "org.apache.kafka.common.serialization.ByteArraySerializer";
65
66 @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
67 label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
68 protected String bootstrapServers = BOOTSTRAP_SERVERS;
69
70 @Property(name = "retries", intValue = RETRIES,
71 label = "Number of times the producer can retry to send after first failure")
72 protected int retries = RETRIES;
73
74 @Property(name = "max.in.flight.requests.per.connection",
75 intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
76 label = "The maximum number of unacknowledged requests the client will send before blocking")
77 protected int maxInFlightRequestsPerConnection =
78 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
79
80 @Property(name = "request.required.acks", intValue = 1,
81 label = "Producer will get an acknowledgement after the leader has replicated the data")
82 protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
83
84 @Property(name = "key.serializer", value = KEY_SERIALIZER,
85 label = "Serializer class for key that implements the Serializer interface.")
86 protected String keySerializer = KEY_SERIALIZER;
87
88 @Property(name = "value.serializer", value = VALUE_SERIALIZER,
89 label = "Serializer class for value that implements the Serializer interface.")
90 protected String valueSerializer = VALUE_SERIALIZER;
91
92 @Activate
93 protected void activate(ComponentContext context) {
94 componentConfigService.registerProperties(getClass());
Shravan Ambatia4875d82017-01-09 13:06:51 -080095 kafkaPublisherAdminService.start(getConfigParams());
Shravan Ambati5a11e172016-07-21 15:55:28 -070096 log.info("Started");
97 }
98
99 @Deactivate
100 protected void deactivate() {
101 componentConfigService.unregisterProperties(getClass(), false);
Shravan Ambatia4875d82017-01-09 13:06:51 -0800102 kafkaPublisherAdminService.stop();
Shravan Ambati5a11e172016-07-21 15:55:28 -0700103 log.info("Stopped");
104 }
105
106 @Modified
107 private void modified(ComponentContext context) {
108 if (context == null) {
109 bootstrapServers = BOOTSTRAP_SERVERS;
110 retries = RETRIES;
111 maxInFlightRequestsPerConnection =
112 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
113 requestRequiredAcks = REQUEST_REQUIRED_ACKS;
114 keySerializer = KEY_SERIALIZER;
115 valueSerializer = VALUE_SERIALIZER;
116 return;
117 }
118
119 Dictionary<?, ?> properties = context.getProperties();
120
121 String newBootstrapServers;
122 int newRetries;
123 int newMaxInFlightRequestsPerConnection;
124 int newRequestRequiredAcks;
125 try {
126 String s = get(properties, "bootstrap.servers");
127 newBootstrapServers =
128 isNullOrEmpty(s) ? bootstrapServers : s.trim();
129
130 s = get(properties, "retries");
131 newRetries =
132 isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
133
134 s = get(properties, "max.in.flight.requests.per.connection");
135 newMaxInFlightRequestsPerConnection =
136 isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
137 : Integer.parseInt(s.trim());
138
139 s = get(properties, "request.required.acks");
140 newRequestRequiredAcks =
141 isNullOrEmpty(s) ? requestRequiredAcks
142 : Integer.parseInt(s.trim());
143
144 } catch (NumberFormatException | ClassCastException e) {
145 return;
146 }
147
148 if (configModified(newBootstrapServers, newRetries,
149 newMaxInFlightRequestsPerConnection,
150 newRequestRequiredAcks)) {
151 bootstrapServers = newBootstrapServers;
152 kafkaServerIp = bootstrapServers
153 .substring(0, bootstrapServers.indexOf(":"));
154 kafkaServerPortNum = bootstrapServers
155 .substring(bootstrapServers.indexOf(":") + 1,
156 bootstrapServers.length());
157
158 retries = newRetries;
159
160 maxInFlightRequestsPerConnection =
161 newMaxInFlightRequestsPerConnection;
162
163 requestRequiredAcks = newRequestRequiredAcks;
164
Shravan Ambatia4875d82017-01-09 13:06:51 -0800165 kafkaPublisherAdminService.restart(KafkaServerConfig.builder()
Shravan Ambati5a11e172016-07-21 15:55:28 -0700166 .ipAddress(kafkaServerIp).port(kafkaServerPortNum)
167 .numOfRetries(retries)
168 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
169 .acksRequired(requestRequiredAcks)
170 .keySerializer(keySerializer)
171 .valueSerializer(valueSerializer).build());
172
173 log.info("Kafka Server Config has been Modified - "
174 + "bootstrapServers {}, retries {}, "
175 + "maxInFlightRequestsPerConnection {}, "
176 + "requestRequiredAcks {}", bootstrapServers, retries,
177 maxInFlightRequestsPerConnection, requestRequiredAcks);
178 } else {
179 return;
180 }
181 }
182
183 private boolean configModified(String newBootstrapServers, int newRetries,
184 int newMaxInFlightRequestsPerConnection,
185 int newRequestRequiredAcks) {
186
187 return !newBootstrapServers.equals(bootstrapServers)
188 || newRetries != retries
189 || newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection
190 || newRequestRequiredAcks != requestRequiredAcks;
191
192 }
193
194 @Override
195 public KafkaServerConfig getConfigParams() {
196 String ipAddr =
197 bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
198 String port =
199 bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
200 bootstrapServers.length());
201
202 return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
203 .numOfRetries(retries)
204 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
205 .acksRequired(requestRequiredAcks).keySerializer(keySerializer)
206 .valueSerializer(valueSerializer).build();
207
208 }
209
210}