blob: 9309c75b5121470835789f5406c66b4a8a486c9a [file] [log] [blame]
Jian Lib9fe3492018-06-07 17:19:07 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li4df75b12018-06-07 22:11:04 +090018import org.apache.felix.scr.annotations.Activate;
Jian Lid1ce10a2018-06-12 13:47:23 +090019import org.apache.felix.scr.annotations.Component;
Jian Li4df75b12018-06-07 22:11:04 +090020import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Modified;
22import org.apache.felix.scr.annotations.Property;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Lid1ce10a2018-06-12 13:47:23 +090025import org.apache.felix.scr.annotations.Service;
Jian Li4df75b12018-06-07 22:11:04 +090026import org.onlab.util.Tools;
27import org.onosproject.cfg.ComponentConfigService;
28import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lib9fe3492018-06-07 17:19:07 +090029import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
30import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Li4df75b12018-06-07 22:11:04 +090031import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
32import org.osgi.service.component.ComponentContext;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36import java.util.Dictionary;
37
Jian Lid1ce10a2018-06-12 13:47:23 +090038import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DISABLE;
Jian Li4df75b12018-06-07 22:11:04 +090039import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_BATCH_SIZE;
40import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_KEY_SERIALIZER;
41import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_LINGER_MS;
42import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_MEMORY_BUFFER;
43import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_REQUIRED_ACKS;
44import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_RETRIES;
45import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_IP;
46import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_PORT;
47import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_VALUE_SERIALIZER;
Jian Lid1ce10a2018-06-12 13:47:23 +090048import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
Jian Lib9fe3492018-06-07 17:19:07 +090049
50/**
51 * Kafka server configuration manager for publishing openstack telemetry.
52 */
Jian Lid1ce10a2018-06-12 13:47:23 +090053@Component(immediate = true)
54@Service
Jian Lib9fe3492018-06-07 17:19:07 +090055public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
56
Jian Li4df75b12018-06-07 22:11:04 +090057 private final Logger log = LoggerFactory.getLogger(getClass());
58
Jian Lid1ce10a2018-06-12 13:47:23 +090059 private static final String ENABLE_SERVICE = "enableService";
Jian Li4df75b12018-06-07 22:11:04 +090060 private static final String ADDRESS = "address";
61 private static final String PORT = "port";
62 private static final String RETRIES = "retries";
63 private static final String REQUIRED_ACKS = "requiredAcks";
64 private static final String BATCH_SIZE = "batchSize";
65 private static final String LINGER_MS = "lingerMs";
66 private static final String MEMORY_BUFFER = "memoryBuffer";
67 private static final String KEY_SERIALIZER = "keySerializer";
68 private static final String VALUE_SERIALIZER = "valueSerializer";
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected ComponentConfigService componentConfigService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
75
76 @Property(name = ADDRESS, value = DEFAULT_KAFKA_SERVER_IP,
77 label = "Default IP address to establish initial connection to Kafka server")
78 protected String address = DEFAULT_KAFKA_SERVER_IP;
79
80 @Property(name = PORT, intValue = DEFAULT_KAFKA_SERVER_PORT,
81 label = "Default port number to establish initial connection to Kafka server")
82 protected Integer port = DEFAULT_KAFKA_SERVER_PORT;
83
84 @Property(name = RETRIES, intValue = DEFAULT_KAFKA_RETRIES,
85 label = "Number of times the producer can retry to send after first failure")
86 protected int retries = DEFAULT_KAFKA_RETRIES;
87
88 @Property(name = REQUIRED_ACKS, value = DEFAULT_KAFKA_REQUIRED_ACKS,
89 label = "Producer will get an acknowledgement after the leader has replicated the data")
90 protected String requiredAcks = DEFAULT_KAFKA_REQUIRED_ACKS;
91
92 @Property(name = BATCH_SIZE, intValue = DEFAULT_KAFKA_BATCH_SIZE,
93 label = "The largest record batch size allowed by Kafka")
94 protected Integer batchSize = DEFAULT_KAFKA_BATCH_SIZE;
95
96 @Property(name = LINGER_MS, intValue = DEFAULT_KAFKA_LINGER_MS,
97 label = "The producer groups together any records that arrive in " +
98 "between request transmissions into a single batched request")
99 protected Integer lingerMs = DEFAULT_KAFKA_LINGER_MS;
100
101 @Property(name = MEMORY_BUFFER, intValue = DEFAULT_KAFKA_MEMORY_BUFFER,
102 label = "The total memory used for log cleaner I/O buffers across all cleaner threads")
103 protected Integer memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
104
105 @Property(name = KEY_SERIALIZER, value = DEFAULT_KAFKA_KEY_SERIALIZER,
106 label = "Serializer class for key that implements the Serializer interface")
107 protected String keySerializer = DEFAULT_KAFKA_KEY_SERIALIZER;
108
109 @Property(name = VALUE_SERIALIZER, value = DEFAULT_KAFKA_VALUE_SERIALIZER,
110 label = "Serializer class for value that implements the Serializer interface")
111 protected String valueSerializer = DEFAULT_KAFKA_VALUE_SERIALIZER;
112
Jian Lid1ce10a2018-06-12 13:47:23 +0900113 @Property(name = ENABLE_SERVICE, boolValue = DEFAULT_DISABLE,
114 label = "Specify the default behavior of telemetry service")
115 protected Boolean enableService = DEFAULT_DISABLE;
116
Jian Li4df75b12018-06-07 22:11:04 +0900117 @Activate
118 protected void activate(ComponentContext context) {
119 componentConfigService.registerProperties(getClass());
Jian Lid1ce10a2018-06-12 13:47:23 +0900120
121 if (enableService) {
122 kafkaTelemetryAdminService.start(getConfig());
123 }
Jian Li4df75b12018-06-07 22:11:04 +0900124 log.info("Started");
125 }
126
127 @Deactivate
128 protected void deactivate() {
129 componentConfigService.unregisterProperties(getClass(), false);
Jian Lid1ce10a2018-06-12 13:47:23 +0900130
131 if (enableService) {
132 kafkaTelemetryAdminService.stop();
133 }
Jian Li4df75b12018-06-07 22:11:04 +0900134 log.info("Stopped");
135 }
136
137 @Modified
138 private void modified(ComponentContext context) {
139 readComponentConfiguration(context);
Jian Lid1ce10a2018-06-12 13:47:23 +0900140
141 if (enableService) {
142 if (kafkaTelemetryAdminService.isRunning()) {
143 kafkaTelemetryAdminService.restart(getConfig());
144 } else {
145 kafkaTelemetryAdminService.start(getConfig());
146 }
147 } else {
148 if (kafkaTelemetryAdminService.isRunning()) {
149 kafkaTelemetryAdminService.stop();
150 }
151 }
Jian Li4df75b12018-06-07 22:11:04 +0900152 log.info("Modified");
153 }
154
Jian Lib9fe3492018-06-07 17:19:07 +0900155 @Override
156 public TelemetryConfig getConfig() {
Jian Li4df75b12018-06-07 22:11:04 +0900157 return new DefaultKafkaTelemetryConfig.DefaultBuilder()
158 .withAddress(address)
159 .withPort(port)
160 .withRetries(retries)
161 .withRequiredAcks(requiredAcks)
162 .withBatchSize(batchSize)
163 .withLingerMs(lingerMs)
164 .withMemoryBuffer(memoryBuffer)
165 .withKeySerializer(keySerializer)
166 .withValueSerializer(valueSerializer)
167 .build();
168 }
169
170 /**
171 * Extracts properties from the component configuration context.
172 *
173 * @param context the component context
174 */
175 private void readComponentConfiguration(ComponentContext context) {
176 Dictionary<?, ?> properties = context.getProperties();
177
178 String addressStr = Tools.get(properties, ADDRESS);
179 address = addressStr != null ? addressStr : DEFAULT_KAFKA_SERVER_IP;
180 log.info("Configured. Kafka server address is {}", address);
181
182 Integer portConfigured = Tools.getIntegerProperty(properties, PORT);
183 if (portConfigured == null) {
184 port = DEFAULT_KAFKA_SERVER_PORT;
185 log.info("Kafka server port is NOT configured, default value is {}", port);
186 } else {
187 port = portConfigured;
188 log.info("Configured. Kafka server port is {}", port);
189 }
190
191 Integer retriesConfigured = Tools.getIntegerProperty(properties, RETRIES);
192 if (retriesConfigured == null) {
193 retries = DEFAULT_KAFKA_RETRIES;
194 log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
195 } else {
196 retries = retriesConfigured;
197 log.info("Configured. Kafka number of retries is {}", retries);
198 }
199
200 String requiredAcksStr = Tools.get(properties, REQUIRED_ACKS);
201 requiredAcks = requiredAcksStr != null ? requiredAcksStr : DEFAULT_KAFKA_REQUIRED_ACKS;
202 log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
203
204 Integer batchSizeConfigured = Tools.getIntegerProperty(properties, BATCH_SIZE);
205 if (batchSizeConfigured == null) {
206 batchSize = DEFAULT_KAFKA_BATCH_SIZE;
207 log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
208 } else {
209 batchSize = batchSizeConfigured;
210 log.info("Configured. Kafka batch size is {}", batchSize);
211 }
212
213 Integer lingerMsConfigured = Tools.getIntegerProperty(properties, LINGER_MS);
214 if (lingerMsConfigured == null) {
215 lingerMs = DEFAULT_KAFKA_LINGER_MS;
216 log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
217 } else {
218 lingerMs = lingerMsConfigured;
219 log.info("Configured. Kafka lingerMs is {}", lingerMs);
220 }
221
222 Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, MEMORY_BUFFER);
223 if (memoryBufferConfigured == null) {
224 memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
225 log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
226 } else {
227 memoryBuffer = memoryBufferConfigured;
228 log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
229 }
230
231 String keySerializerStr = Tools.get(properties, KEY_SERIALIZER);
232 keySerializer = keySerializerStr != null ? keySerializerStr : DEFAULT_KAFKA_KEY_SERIALIZER;
233 log.info("Configured, Kafka key serializer is {}", keySerializer);
234
235 String valueSerializerStr = Tools.get(properties, VALUE_SERIALIZER);
236 valueSerializer = valueSerializerStr != null ? valueSerializerStr : DEFAULT_KAFKA_VALUE_SERIALIZER;
237 log.info("Configured, Kafka value serializer is {}", valueSerializer);
Jian Lid1ce10a2018-06-12 13:47:23 +0900238
239 Boolean enableServiceConfigured =
240 getBooleanProperty(properties, ENABLE_SERVICE);
241 if (enableServiceConfigured == null) {
242 enableService = DEFAULT_DISABLE;
243 log.info("Kafka service enable flag is NOT " +
244 "configured, default value is {}", enableService);
245 } else {
246 enableService = enableServiceConfigured;
247 log.info("Configured. Kafka service enable flag is {}", enableService);
248 }
Jian Lib9fe3492018-06-07 17:19:07 +0900249 }
250}