blob: 1cd52ef450a2ffd736bf541358f9312ab1ac47bb [file] [log] [blame]
Jian Li2cf1c0b2018-06-07 11:28:56 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.config;
17
Jian Li69600e02018-12-24 13:21:18 +090018import com.google.common.base.Strings;
Jian Li2cf1c0b2018-06-07 11:28:56 +090019import com.google.common.collect.ImmutableMap;
20import com.google.common.collect.Maps;
21import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
Jian Li52c11222018-06-07 11:39:17 +090022import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Li69600e02018-12-24 13:21:18 +090023import org.onosproject.openstacktelemetry.api.config.TelemetryConfigProperties;
Jian Li2cf1c0b2018-06-07 11:28:56 +090024
25import java.util.Map;
26import java.util.Objects;
27
Jian Li3db90852018-06-10 22:29:16 +090028import static com.google.common.base.MoreObjects.toStringHelper;
Jian Li2cf1c0b2018-06-07 11:28:56 +090029import static com.google.common.base.Preconditions.checkNotNull;
Jian Li69600e02018-12-24 13:21:18 +090030import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
Jian Li2cf1c0b2018-06-07 11:28:56 +090031
32/**
33 * A configuration file contains Kafka telemetry parameters.
34 */
35public final class DefaultKafkaTelemetryConfig implements KafkaTelemetryConfig {
36
Jian Li69600e02018-12-24 13:21:18 +090037 protected static final String ADDRESS = "address";
38 protected static final String PORT = "port";
39 protected static final String RETRIES = "retries";
40 protected static final String REQUIRED_ACKS = "requiredAcks";
41 protected static final String BATCH_SIZE = "batchSize";
42 protected static final String LINGER_MS = "lingerMs";
43 protected static final String MEMORY_BUFFER = "memoryBuffer";
44 protected static final String KEY_SERIALIZER = "keySerializer";
45 protected static final String VALUE_SERIALIZER = "valueSerializer";
46 protected static final String KEY = "key";
47 protected static final String TOPIC = "topic";
48 protected static final String CODEC = "codec";
49 protected static final String CONFIG_MAP = "configMap";
50
Jian Li2cf1c0b2018-06-07 11:28:56 +090051 private final String address;
52 private final int port;
53 private final int retries;
54 private final String requiredAcks;
55 private final int batchSize;
56 private final int lingerMs;
57 private final int memoryBuffer;
58 private final String keySerializer;
59 private final String valueSerializer;
Jian Li69600e02018-12-24 13:21:18 +090060 private final String key;
61 private final String topic;
62 private final String codec;
Jian Li2cf1c0b2018-06-07 11:28:56 +090063 private final Map<String, Object> configMap;
64
65 private DefaultKafkaTelemetryConfig(String address, int port, int retries,
66 String requiredAcks, int batchSize,
67 int lingerMs, int memoryBuffer,
68 String keySerializer,
69 String valueSerializer,
Jian Li69600e02018-12-24 13:21:18 +090070 String key, String topic, String codec,
Jian Li2cf1c0b2018-06-07 11:28:56 +090071 Map<String, Object> configMap) {
72 this.address = address;
73 this.port = port;
74 this.retries = retries;
75 this.requiredAcks = requiredAcks;
76 this.batchSize = batchSize;
77 this.lingerMs = lingerMs;
78 this.memoryBuffer = memoryBuffer;
79 this.keySerializer = keySerializer;
80 this.valueSerializer = valueSerializer;
Jian Li69600e02018-12-24 13:21:18 +090081 this.key = key;
82 this.topic = topic;
83 this.codec = codec;
Jian Li2cf1c0b2018-06-07 11:28:56 +090084 this.configMap = configMap;
85 }
86
87 @Override
88 public String address() {
89 return address;
90 }
91
92 @Override
93 public int port() {
94 return port;
95 }
96
97 @Override
98 public int retries() {
99 return retries;
100 }
101
102 @Override
103 public String requiredAcks() {
104 return requiredAcks;
105 }
106
107 @Override
108 public int batchSize() {
109 return batchSize;
110 }
111
112 @Override
113 public int lingerMs() {
114 return lingerMs;
115 }
116
117 @Override
118 public int memoryBuffer() {
119 return memoryBuffer;
120 }
121
122 @Override
123 public String keySerializer() {
124 return keySerializer;
125 }
126
127 @Override
128 public String valueSerializer() {
129 return valueSerializer;
130 }
131
132 @Override
Jian Li69600e02018-12-24 13:21:18 +0900133 public String key() {
134 return key;
135 }
136
137 @Override
138 public String topic() {
139 return topic;
140 }
141
142 @Override
143 public String codec() {
144 return codec;
145 }
146
147 @Override
Jian Li2cf1c0b2018-06-07 11:28:56 +0900148 public Map<String, Object> configMap() {
149 if (configMap != null) {
150 return ImmutableMap.copyOf(configMap);
151 } else {
152 return Maps.newConcurrentMap();
153 }
154 }
155
156 @Override
157 public boolean equals(Object obj) {
158 if (this == obj) {
159 return true;
160 }
161
162 if (obj instanceof DefaultKafkaTelemetryConfig) {
163 final DefaultKafkaTelemetryConfig other = (DefaultKafkaTelemetryConfig) obj;
164 return Objects.equals(this.address, other.address) &&
165 Objects.equals(this.port, other.port) &&
166 Objects.equals(this.retries, other.retries) &&
167 Objects.equals(this.requiredAcks, other.requiredAcks) &&
168 Objects.equals(this.batchSize, other.batchSize) &&
169 Objects.equals(this.lingerMs, other.lingerMs) &&
170 Objects.equals(this.memoryBuffer, other.memoryBuffer) &&
171 Objects.equals(this.keySerializer, other.keySerializer) &&
172 Objects.equals(this.valueSerializer, other.valueSerializer) &&
Jian Li69600e02018-12-24 13:21:18 +0900173 Objects.equals(this.key, other.key) &&
174 Objects.equals(this.topic, other.topic) &&
175 Objects.equals(this.codec, other.codec) &&
Jian Li2cf1c0b2018-06-07 11:28:56 +0900176 Objects.equals(this.configMap, other.configMap);
177 }
178 return false;
179 }
180
181 @Override
182 public int hashCode() {
183 return Objects.hash(address, port, retries, requiredAcks, batchSize,
Jian Li69600e02018-12-24 13:21:18 +0900184 lingerMs, memoryBuffer, keySerializer, valueSerializer,
185 key, topic, codec, configMap);
Jian Li2cf1c0b2018-06-07 11:28:56 +0900186 }
187
Jian Li52c11222018-06-07 11:39:17 +0900188 @Override
Jian Li3db90852018-06-10 22:29:16 +0900189 public String toString() {
190 return toStringHelper(this)
Jian Li69600e02018-12-24 13:21:18 +0900191 .add(ADDRESS, address)
192 .add(PORT, port)
193 .add(RETRIES, retries)
194 .add(REQUIRED_ACKS, requiredAcks)
195 .add(BATCH_SIZE, batchSize)
196 .add(LINGER_MS, lingerMs)
197 .add(MEMORY_BUFFER, memoryBuffer)
198 .add(KEY_SERIALIZER, keySerializer)
199 .add(VALUE_SERIALIZER, valueSerializer)
200 .add(KEY, key)
201 .add(TOPIC, topic)
202 .add(CODEC, codec)
203 .add(CONFIG_MAP, configMap)
Jian Li3db90852018-06-10 22:29:16 +0900204 .toString();
205 }
206
207 @Override
Jian Li69600e02018-12-24 13:21:18 +0900208 public TelemetryConfigProperties.Builder createBuilder() {
Jian Li52c11222018-06-07 11:39:17 +0900209 return new DefaultBuilder();
210 }
211
Jian Li2cf1c0b2018-06-07 11:28:56 +0900212 /**
Jian Li69600e02018-12-24 13:21:18 +0900213 * Builds a kafka telemetry config from telemetry config instance.
214 *
215 * @param config telemetry config
216 * @return kafka telemetry config
217 */
218 public static KafkaTelemetryConfig fromTelemetryConfig(TelemetryConfig config) {
219 if (config.type() != KAFKA) {
220 return null;
221 }
222
223 int retries = Strings.isNullOrEmpty(config.getProperty(RETRIES)) ? 0 :
224 Integer.valueOf(config.getProperty(RETRIES));
225 int batchSize = Strings.isNullOrEmpty(config.getProperty(BATCH_SIZE)) ? 0 :
226 Integer.valueOf(config.getProperty(BATCH_SIZE));
227 int lingerMs = Strings.isNullOrEmpty(config.getProperty(LINGER_MS)) ? 0 :
228 Integer.valueOf(config.getProperty(LINGER_MS));
229 int memoryBuffer = Strings.isNullOrEmpty(config.getProperty(MEMORY_BUFFER)) ? 0 :
230 Integer.valueOf(config.getProperty(MEMORY_BUFFER));
231
232 return new DefaultBuilder()
233 .withAddress(config.getProperty(ADDRESS))
234 .withPort(Integer.valueOf(config.getProperty(PORT)))
235 .withRetries(retries)
236 .withRequiredAcks(config.getProperty(REQUIRED_ACKS))
237 .withBatchSize(batchSize)
238 .withLingerMs(lingerMs)
239 .withMemoryBuffer(memoryBuffer)
240 .withKeySerializer(config.getProperty(KEY_SERIALIZER))
241 .withValueSerializer(config.getProperty(VALUE_SERIALIZER))
242 .withKey(config.getProperty(KEY))
243 .withTopic(config.getProperty(TOPIC))
244 .withCodec(config.getProperty(CODEC))
245 .build();
246 }
247
248 /**
Jian Li2cf1c0b2018-06-07 11:28:56 +0900249 * Builder class of DefaultKafkaTelemetryConfig.
250 */
Jian Li52c11222018-06-07 11:39:17 +0900251 public static final class DefaultBuilder implements Builder {
Jian Li2cf1c0b2018-06-07 11:28:56 +0900252 private String address;
253 private int port;
254 private int retries;
255 private String requiredAcks;
256 private int batchSize;
257 private int lingerMs;
258 private int memoryBuffer;
259 private String keySerializer;
260 private String valueSerializer;
Jian Li69600e02018-12-24 13:21:18 +0900261 private String key;
262 private String topic;
263 private String codec;
Jian Li2cf1c0b2018-06-07 11:28:56 +0900264 private Map<String, Object> configMap;
265
266 @Override
267 public Builder withAddress(String address) {
268 this.address = address;
269 return this;
270 }
271
272 @Override
273 public Builder withPort(int port) {
274 this.port = port;
275 return this;
276 }
277
278 @Override
279 public Builder withRetries(int retries) {
280 this.retries = retries;
281 return this;
282 }
283
284 @Override
285 public Builder withRequiredAcks(String requiredAcks) {
286 this.requiredAcks = requiredAcks;
287 return this;
288 }
289
290 @Override
291 public Builder withBatchSize(int batchSize) {
292 this.batchSize = batchSize;
293 return this;
294 }
295
296 @Override
297 public Builder withLingerMs(int lingerMs) {
298 this.lingerMs = lingerMs;
299 return this;
300 }
301
302 @Override
303 public Builder withMemoryBuffer(int memoryBuffer) {
304 this.memoryBuffer = memoryBuffer;
305 return this;
306 }
307
308 @Override
309 public Builder withKeySerializer(String keySerializer) {
310 this.keySerializer = keySerializer;
311 return this;
312 }
313
314 @Override
315 public Builder withValueSerializer(String valueSerializer) {
316 this.valueSerializer = valueSerializer;
317 return this;
318 }
319
320 @Override
Jian Li69600e02018-12-24 13:21:18 +0900321 public Builder withKey(String key) {
322 this.key = key;
323 return this;
324 }
325
326 @Override
327 public Builder withTopic(String topic) {
328 this.topic = topic;
329 return this;
330 }
331
332 @Override
333 public Builder withCodec(String codec) {
334 this.codec = codec;
335 return this;
336 }
337
338 @Override
Jian Li2cf1c0b2018-06-07 11:28:56 +0900339 public Builder withConfigMap(Map<String, Object> configMap) {
340 this.configMap = configMap;
341 return this;
342 }
343
344 @Override
345 public KafkaTelemetryConfig build() {
346 checkNotNull(address, "Kafka server address cannot be null");
Jian Li2cf1c0b2018-06-07 11:28:56 +0900347
348 return new DefaultKafkaTelemetryConfig(address, port, retries,
Jian Li69600e02018-12-24 13:21:18 +0900349 requiredAcks, batchSize, lingerMs, memoryBuffer, keySerializer,
350 valueSerializer, key, topic, codec, configMap);
Jian Li2cf1c0b2018-06-07 11:28:56 +0900351 }
352 }
Jian Li69600e02018-12-24 13:21:18 +0900353}