blob: 4b90b8ed7e8eaed860f1714ff03f4f87a46841b0 [file] [log] [blame]
Shravan Ambati5a11e172016-07-21 15:55:28 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Shravan Ambati5a11e172016-07-21 15:55:28 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.kafka;
17
Shravan Ambati5a11e172016-07-21 15:55:28 -070018import org.onosproject.cfg.ComponentConfigService;
19import org.onosproject.kafkaintegration.api.KafkaConfigService;
Shravan Ambatia4875d82017-01-09 13:06:51 -080020import org.onosproject.kafkaintegration.api.KafkaPublisherAdminService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070021import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
22import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070023import org.osgi.service.component.annotations.Activate;
24import org.osgi.service.component.annotations.Component;
25import org.osgi.service.component.annotations.Deactivate;
26import org.osgi.service.component.annotations.Modified;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070027import org.osgi.service.component.annotations.Reference;
28import org.osgi.service.component.annotations.ReferenceCardinality;
Shravan Ambati5a11e172016-07-21 15:55:28 -070029import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
Ray Milkeyd84f89b2018-08-17 14:54:17 -070032import java.util.Dictionary;
33
34import static com.google.common.base.Strings.isNullOrEmpty;
35import static org.onlab.util.Tools.get;
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080036import static org.onosproject.kafkaintegration.kafka.OsgiPropertyConstants.*;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070037
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080038@Component(immediate = true, service = KafkaConfigService.class,
39 property = {
40 BOOTSTRAP_SERVERS + "=" + BOOTSTRAP_SERVERS_DEFAULT,
41 RETRIES + ":Integer=" + RETRIES_DEFAULT,
42 MAX_IN_FLIGHT + ":Integer=" + MAX_IN_FLIGHT_DEFAULT,
43 REQUIRED_ACKS + ":Integer=" + REQUIRED_ACKS_DEFAULT,
44 KEY_SERIALIZER + "=" + KEY_SERIALIZER_DEFAULT,
45 VALUE_SERIALIZER + "=" + VALUE_SERIALIZER_DEFAULT,
46 }
47)
Shravan Ambati5a11e172016-07-21 15:55:28 -070048public class KafkaConfigManager implements KafkaConfigService {
49
50 private final Logger log = LoggerFactory.getLogger(getClass());
51
Ray Milkeyd84f89b2018-08-17 14:54:17 -070052 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambati5a11e172016-07-21 15:55:28 -070053 protected ComponentConfigService componentConfigService;
54
Ray Milkeyd84f89b2018-08-17 14:54:17 -070055 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080056 protected KafkaPublisherAdminService kafkaPublisherAdminService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070057
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080058 /** Default IP/Port pair to establish initial connection to Kafka cluster. */
59 protected String bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
Shravan Ambati5a11e172016-07-21 15:55:28 -070060
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080061 /** Number of times the producer can retry to send after first failure. */
62 protected int retries = RETRIES_DEFAULT;
Shravan Ambati5a11e172016-07-21 15:55:28 -070063
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080064 /** The maximum number of unacknowledged requests the client will send before blocking. */
65 protected int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
Shravan Ambati5a11e172016-07-21 15:55:28 -070066
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080067 /** Producer will get an acknowledgement after the leader has replicated the data. */
68 protected int requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
Shravan Ambati5a11e172016-07-21 15:55:28 -070069
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080070 /** Serializer class for key that implements the Serializer interface. */
71 protected String keySerializer = KEY_SERIALIZER_DEFAULT;
Shravan Ambati5a11e172016-07-21 15:55:28 -070072
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080073 /** Serializer class for value that implements the Serializer interface. */
Shravan Ambati5a11e172016-07-21 15:55:28 -070074 protected String valueSerializer = VALUE_SERIALIZER;
75
76 @Activate
77 protected void activate(ComponentContext context) {
78 componentConfigService.registerProperties(getClass());
Shravan Ambatia4875d82017-01-09 13:06:51 -080079 kafkaPublisherAdminService.start(getConfigParams());
Shravan Ambati5a11e172016-07-21 15:55:28 -070080 log.info("Started");
81 }
82
83 @Deactivate
84 protected void deactivate() {
85 componentConfigService.unregisterProperties(getClass(), false);
Shravan Ambatia4875d82017-01-09 13:06:51 -080086 kafkaPublisherAdminService.stop();
Shravan Ambati5a11e172016-07-21 15:55:28 -070087 log.info("Stopped");
88 }
89
90 @Modified
91 private void modified(ComponentContext context) {
92 if (context == null) {
Thomas Vachuska5bd7b552018-11-08 15:41:27 -080093 bootstrapServers = BOOTSTRAP_SERVERS_DEFAULT;
94 retries = RETRIES_DEFAULT;
95 maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_DEFAULT;
96 requestRequiredAcks = REQUIRED_ACKS_DEFAULT;
97 keySerializer = KEY_SERIALIZER_DEFAULT;
98 valueSerializer = VALUE_SERIALIZER_DEFAULT;
Shravan Ambati5a11e172016-07-21 15:55:28 -070099 return;
100 }
101
102 Dictionary<?, ?> properties = context.getProperties();
103
104 String newBootstrapServers;
105 int newRetries;
106 int newMaxInFlightRequestsPerConnection;
107 int newRequestRequiredAcks;
108 try {
Thomas Vachuska5bd7b552018-11-08 15:41:27 -0800109 String s = get(properties, BOOTSTRAP_SERVERS);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700110 newBootstrapServers =
111 isNullOrEmpty(s) ? bootstrapServers : s.trim();
112
Thomas Vachuska5bd7b552018-11-08 15:41:27 -0800113 s = get(properties, RETRIES);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700114 newRetries =
115 isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
116
Thomas Vachuska5bd7b552018-11-08 15:41:27 -0800117 s = get(properties, MAX_IN_FLIGHT);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700118 newMaxInFlightRequestsPerConnection =
119 isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
120 : Integer.parseInt(s.trim());
121
Thomas Vachuska5bd7b552018-11-08 15:41:27 -0800122 s = get(properties, REQUIRED_ACKS);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700123 newRequestRequiredAcks =
124 isNullOrEmpty(s) ? requestRequiredAcks
125 : Integer.parseInt(s.trim());
126
127 } catch (NumberFormatException | ClassCastException e) {
128 return;
129 }
130
131 if (configModified(newBootstrapServers, newRetries,
132 newMaxInFlightRequestsPerConnection,
133 newRequestRequiredAcks)) {
134 bootstrapServers = newBootstrapServers;
Thomas Vachuska5bd7b552018-11-08 15:41:27 -0800135 String kafkaServerIp = bootstrapServers
Shravan Ambati5a11e172016-07-21 15:55:28 -0700136 .substring(0, bootstrapServers.indexOf(":"));
Thomas Vachuska5bd7b552018-11-08 15:41:27 -0800137 String kafkaServerPortNum = bootstrapServers
138 .substring(bootstrapServers.indexOf(":") + 1);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700139
140 retries = newRetries;
141
142 maxInFlightRequestsPerConnection =
143 newMaxInFlightRequestsPerConnection;
144
145 requestRequiredAcks = newRequestRequiredAcks;
146
Shravan Ambatia4875d82017-01-09 13:06:51 -0800147 kafkaPublisherAdminService.restart(KafkaServerConfig.builder()
Shravan Ambati5a11e172016-07-21 15:55:28 -0700148 .ipAddress(kafkaServerIp).port(kafkaServerPortNum)
149 .numOfRetries(retries)
150 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
151 .acksRequired(requestRequiredAcks)
152 .keySerializer(keySerializer)
153 .valueSerializer(valueSerializer).build());
154
155 log.info("Kafka Server Config has been Modified - "
156 + "bootstrapServers {}, retries {}, "
157 + "maxInFlightRequestsPerConnection {}, "
158 + "requestRequiredAcks {}", bootstrapServers, retries,
159 maxInFlightRequestsPerConnection, requestRequiredAcks);
160 } else {
161 return;
162 }
163 }
164
165 private boolean configModified(String newBootstrapServers, int newRetries,
166 int newMaxInFlightRequestsPerConnection,
167 int newRequestRequiredAcks) {
168
169 return !newBootstrapServers.equals(bootstrapServers)
170 || newRetries != retries
171 || newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection
172 || newRequestRequiredAcks != requestRequiredAcks;
173
174 }
175
176 @Override
177 public KafkaServerConfig getConfigParams() {
Thomas Vachuska5bd7b552018-11-08 15:41:27 -0800178 String ipAddr = bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
179 String port = bootstrapServers.substring(bootstrapServers.indexOf(":") + 1);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700180
181 return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
182 .numOfRetries(retries)
183 .maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
184 .acksRequired(requestRequiredAcks).keySerializer(keySerializer)
185 .valueSerializer(valueSerializer).build();
186
187 }
188
189}