blob: 6fc90fda4328e4e66600730c6a8ad954c8042f53 [file] [log] [blame]
Jian Li2cf1c0b2018-06-07 11:28:56 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.config;
17
18import com.google.common.collect.ImmutableMap;
19import com.google.common.collect.Maps;
20import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
Jian Li52c11222018-06-07 11:39:17 +090021import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Li2cf1c0b2018-06-07 11:28:56 +090022
23import java.util.Map;
24import java.util.Objects;
25
Jian Li3db90852018-06-10 22:29:16 +090026import static com.google.common.base.MoreObjects.toStringHelper;
Jian Li2cf1c0b2018-06-07 11:28:56 +090027import static com.google.common.base.Preconditions.checkNotNull;
28
29/**
30 * A configuration file contains Kafka telemetry parameters.
31 */
32public final class DefaultKafkaTelemetryConfig implements KafkaTelemetryConfig {
33
34 private final String address;
35 private final int port;
36 private final int retries;
37 private final String requiredAcks;
38 private final int batchSize;
39 private final int lingerMs;
40 private final int memoryBuffer;
41 private final String keySerializer;
42 private final String valueSerializer;
43 private final Map<String, Object> configMap;
44
45 private DefaultKafkaTelemetryConfig(String address, int port, int retries,
46 String requiredAcks, int batchSize,
47 int lingerMs, int memoryBuffer,
48 String keySerializer,
49 String valueSerializer,
50 Map<String, Object> configMap) {
51 this.address = address;
52 this.port = port;
53 this.retries = retries;
54 this.requiredAcks = requiredAcks;
55 this.batchSize = batchSize;
56 this.lingerMs = lingerMs;
57 this.memoryBuffer = memoryBuffer;
58 this.keySerializer = keySerializer;
59 this.valueSerializer = valueSerializer;
60 this.configMap = configMap;
61 }
62
63 @Override
64 public String address() {
65 return address;
66 }
67
68 @Override
69 public int port() {
70 return port;
71 }
72
73 @Override
74 public int retries() {
75 return retries;
76 }
77
78 @Override
79 public String requiredAcks() {
80 return requiredAcks;
81 }
82
83 @Override
84 public int batchSize() {
85 return batchSize;
86 }
87
88 @Override
89 public int lingerMs() {
90 return lingerMs;
91 }
92
93 @Override
94 public int memoryBuffer() {
95 return memoryBuffer;
96 }
97
98 @Override
99 public String keySerializer() {
100 return keySerializer;
101 }
102
103 @Override
104 public String valueSerializer() {
105 return valueSerializer;
106 }
107
108 @Override
109 public Map<String, Object> configMap() {
110 if (configMap != null) {
111 return ImmutableMap.copyOf(configMap);
112 } else {
113 return Maps.newConcurrentMap();
114 }
115 }
116
117 @Override
118 public boolean equals(Object obj) {
119 if (this == obj) {
120 return true;
121 }
122
123 if (obj instanceof DefaultKafkaTelemetryConfig) {
124 final DefaultKafkaTelemetryConfig other = (DefaultKafkaTelemetryConfig) obj;
125 return Objects.equals(this.address, other.address) &&
126 Objects.equals(this.port, other.port) &&
127 Objects.equals(this.retries, other.retries) &&
128 Objects.equals(this.requiredAcks, other.requiredAcks) &&
129 Objects.equals(this.batchSize, other.batchSize) &&
130 Objects.equals(this.lingerMs, other.lingerMs) &&
131 Objects.equals(this.memoryBuffer, other.memoryBuffer) &&
132 Objects.equals(this.keySerializer, other.keySerializer) &&
133 Objects.equals(this.valueSerializer, other.valueSerializer) &&
134 Objects.equals(this.configMap, other.configMap);
135 }
136 return false;
137 }
138
139 @Override
140 public int hashCode() {
141 return Objects.hash(address, port, retries, requiredAcks, batchSize,
142 lingerMs, memoryBuffer, keySerializer, valueSerializer, configMap);
143 }
144
Jian Li52c11222018-06-07 11:39:17 +0900145 @Override
Jian Li3db90852018-06-10 22:29:16 +0900146 public String toString() {
147 return toStringHelper(this)
148 .add("address", address)
149 .add("port", port)
150 .add("retries", retries)
151 .add("requiredAcks", requiredAcks)
152 .add("batchSize", batchSize)
153 .add("lingerMs", lingerMs)
154 .add("memoryBuffer", memoryBuffer)
155 .add("keySerializer", keySerializer)
156 .add("valueSerializer", valueSerializer)
157 .add("configMap", configMap)
158 .toString();
159 }
160
161 @Override
Jian Li52c11222018-06-07 11:39:17 +0900162 public TelemetryConfig.Builder createBuilder() {
163 return new DefaultBuilder();
164 }
165
Jian Li2cf1c0b2018-06-07 11:28:56 +0900166 /**
167 * Builder class of DefaultKafkaTelemetryConfig.
168 */
Jian Li52c11222018-06-07 11:39:17 +0900169 public static final class DefaultBuilder implements Builder {
Jian Li2cf1c0b2018-06-07 11:28:56 +0900170 private String address;
171 private int port;
172 private int retries;
173 private String requiredAcks;
174 private int batchSize;
175 private int lingerMs;
176 private int memoryBuffer;
177 private String keySerializer;
178 private String valueSerializer;
179 private Map<String, Object> configMap;
180
181 @Override
182 public Builder withAddress(String address) {
183 this.address = address;
184 return this;
185 }
186
187 @Override
188 public Builder withPort(int port) {
189 this.port = port;
190 return this;
191 }
192
193 @Override
194 public Builder withRetries(int retries) {
195 this.retries = retries;
196 return this;
197 }
198
199 @Override
200 public Builder withRequiredAcks(String requiredAcks) {
201 this.requiredAcks = requiredAcks;
202 return this;
203 }
204
205 @Override
206 public Builder withBatchSize(int batchSize) {
207 this.batchSize = batchSize;
208 return this;
209 }
210
211 @Override
212 public Builder withLingerMs(int lingerMs) {
213 this.lingerMs = lingerMs;
214 return this;
215 }
216
217 @Override
218 public Builder withMemoryBuffer(int memoryBuffer) {
219 this.memoryBuffer = memoryBuffer;
220 return this;
221 }
222
223 @Override
224 public Builder withKeySerializer(String keySerializer) {
225 this.keySerializer = keySerializer;
226 return this;
227 }
228
229 @Override
230 public Builder withValueSerializer(String valueSerializer) {
231 this.valueSerializer = valueSerializer;
232 return this;
233 }
234
235 @Override
236 public Builder withConfigMap(Map<String, Object> configMap) {
237 this.configMap = configMap;
238 return this;
239 }
240
241 @Override
242 public KafkaTelemetryConfig build() {
243 checkNotNull(address, "Kafka server address cannot be null");
244 checkNotNull(keySerializer, "Kafka key serializer cannot be null");
245 checkNotNull(valueSerializer, "Kafka value serializer cannot be null");
246
247 return new DefaultKafkaTelemetryConfig(address, port, retries,
248 requiredAcks, batchSize, lingerMs, memoryBuffer,
249 keySerializer, valueSerializer, configMap);
250 }
251 }
252}