blob: 40d1d2d89867ff66d04d589b00505a813b56a4fe [file] [log] [blame]
Shravan Ambati5a11e172016-07-21 15:55:28 -07001/**
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.kafka;
17
18import static com.google.common.base.Strings.isNullOrEmpty;
19import static org.onlab.util.Tools.get;
20
21import java.util.Dictionary;
22
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onosproject.cfg.ComponentConfigService;
32import org.onosproject.kafkaintegration.api.KafkaConfigService;
Shravan Ambati4c6295e2016-12-20 14:53:06 -080033import org.onosproject.kafkaintegration.api.KafkaPublisherService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070034import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
35import org.osgi.service.component.ComponentContext;
36import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
39@Component(immediate = true)
40@Service
41public class KafkaConfigManager implements KafkaConfigService {
42
43 private final Logger log = LoggerFactory.getLogger(getClass());
44
45 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 protected ComponentConfigService componentConfigService;
47
48 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Shravan Ambati4c6295e2016-12-20 14:53:06 -080049 protected KafkaPublisherService producer;
Shravan Ambati5a11e172016-07-21 15:55:28 -070050
51 public static final String BOOTSTRAP_SERVERS = "localhost:9092";
52 private String kafkaServerIp =
53 BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
54 private String kafkaServerPortNum =
55 BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
56 BOOTSTRAP_SERVERS.length());
57
58 private static final int RETRIES = 1;
59 private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
60 private static final int REQUEST_REQUIRED_ACKS = 1;
61 private static final String KEY_SERIALIZER =
62 "org.apache.kafka.common.serialization.StringSerializer";
63 private static final String VALUE_SERIALIZER =
64 "org.apache.kafka.common.serialization.ByteArraySerializer";
65
66 @Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
67 label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
68 protected String bootstrapServers = BOOTSTRAP_SERVERS;
69
70 @Property(name = "retries", intValue = RETRIES,
71 label = "Number of times the producer can retry to send after first failure")
72 protected int retries = RETRIES;
73
74 @Property(name = "max.in.flight.requests.per.connection",
75 intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
76 label = "The maximum number of unacknowledged requests the client will send before blocking")
77 protected int maxInFlightRequestsPerConnection =
78 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
79
80 @Property(name = "request.required.acks", intValue = 1,
81 label = "Producer will get an acknowledgement after the leader has replicated the data")
82 protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
83
84 @Property(name = "key.serializer", value = KEY_SERIALIZER,
85 label = "Serializer class for key that implements the Serializer interface.")
86 protected String keySerializer = KEY_SERIALIZER;
87
88 @Property(name = "value.serializer", value = VALUE_SERIALIZER,
89 label = "Serializer class for value that implements the Serializer interface.")
90 protected String valueSerializer = VALUE_SERIALIZER;
91
92 @Activate
93 protected void activate(ComponentContext context) {
94 componentConfigService.registerProperties(getClass());
95 log.info("Started");
96 }
97
98 @Deactivate
99 protected void deactivate() {
100 componentConfigService.unregisterProperties(getClass(), false);
101 log.info("Stopped");
102 }
103
104 @Modified
105 private void modified(ComponentContext context) {
106 if (context == null) {
107 bootstrapServers = BOOTSTRAP_SERVERS;
108 retries = RETRIES;
109 maxInFlightRequestsPerConnection =
110 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
111 requestRequiredAcks = REQUEST_REQUIRED_ACKS;
112 keySerializer = KEY_SERIALIZER;
113 valueSerializer = VALUE_SERIALIZER;
114 return;
115 }
116
117 Dictionary<?, ?> properties = context.getProperties();
118
119 String newBootstrapServers;
120 int newRetries;
121 int newMaxInFlightRequestsPerConnection;
122 int newRequestRequiredAcks;
123 try {
124 String s = get(properties, "bootstrap.servers");
125 newBootstrapServers =
126 isNullOrEmpty(s) ? bootstrapServers : s.trim();
127
128 s = get(properties, "retries");
129 newRetries =
130 isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
131
132 s = get(properties, "max.in.flight.requests.per.connection");
133 newMaxInFlightRequestsPerConnection =
134 isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
135 : Integer.parseInt(s.trim());
136
137 s = get(properties, "request.required.acks");
138 newRequestRequiredAcks =
139 isNullOrEmpty(s) ? requestRequiredAcks
140 : Integer.parseInt(s.trim());
141
142 } catch (NumberFormatException | ClassCastException e) {
143 return;
144 }
145
146 if (configModified(newBootstrapServers, newRetries,
147 newMaxInFlightRequestsPerConnection,
148 newRequestRequiredAcks)) {
149 bootstrapServers = newBootstrapServers;
150 kafkaServerIp = bootstrapServers
151 .substring(0, bootstrapServers.indexOf(":"));
152 kafkaServerPortNum = bootstrapServers
153 .substring(bootstrapServers.indexOf(":") + 1,
154 bootstrapServers.length());
155
156 retries = newRetries;
157
158 maxInFlightRequestsPerConnection =
159 newMaxInFlightRequestsPerConnection;
160
161 requestRequiredAcks = newRequestRequiredAcks;
162
163 producer.restart(KafkaServerConfig.builder()
164 .ipAddress(kafkaServerIp).port(kafkaServerPortNum)
165 .numOfRetries(retries)
166 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
167 .acksRequired(requestRequiredAcks)
168 .keySerializer(keySerializer)
169 .valueSerializer(valueSerializer).build());
170
171 log.info("Kafka Server Config has been Modified - "
172 + "bootstrapServers {}, retries {}, "
173 + "maxInFlightRequestsPerConnection {}, "
174 + "requestRequiredAcks {}", bootstrapServers, retries,
175 maxInFlightRequestsPerConnection, requestRequiredAcks);
176 } else {
177 return;
178 }
179 }
180
181 private boolean configModified(String newBootstrapServers, int newRetries,
182 int newMaxInFlightRequestsPerConnection,
183 int newRequestRequiredAcks) {
184
185 return !newBootstrapServers.equals(bootstrapServers)
186 || newRetries != retries
187 || newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection
188 || newRequestRequiredAcks != requestRequiredAcks;
189
190 }
191
192 @Override
193 public KafkaServerConfig getConfigParams() {
194 String ipAddr =
195 bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
196 String port =
197 bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
198 bootstrapServers.length());
199
200 return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
201 .numOfRetries(retries)
202 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
203 .acksRequired(requestRequiredAcks).keySerializer(keySerializer)
204 .valueSerializer(valueSerializer).build();
205
206 }
207
208}