blob: 956e275a56a3b95a6a84b02d5450943723b2aae8 [file] [log] [blame]
Jian Lib9fe3492018-06-07 17:19:07 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li4df75b12018-06-07 22:11:04 +090018import org.apache.felix.scr.annotations.Activate;
Jian Lid1ce10a2018-06-12 13:47:23 +090019import org.apache.felix.scr.annotations.Component;
Jian Li4df75b12018-06-07 22:11:04 +090020import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Modified;
22import org.apache.felix.scr.annotations.Property;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Lid1ce10a2018-06-12 13:47:23 +090025import org.apache.felix.scr.annotations.Service;
Jian Li4df75b12018-06-07 22:11:04 +090026import org.onlab.util.Tools;
27import org.onosproject.cfg.ComponentConfigService;
28import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lib9fe3492018-06-07 17:19:07 +090029import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
30import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Li4df75b12018-06-07 22:11:04 +090031import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
32import org.osgi.service.component.ComponentContext;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36import java.util.Dictionary;
37
Jian Lid1ce10a2018-06-12 13:47:23 +090038import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DISABLE;
Jian Li4df75b12018-06-07 22:11:04 +090039import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_BATCH_SIZE;
40import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_KEY_SERIALIZER;
41import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_LINGER_MS;
42import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_MEMORY_BUFFER;
43import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_REQUIRED_ACKS;
44import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_RETRIES;
45import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_IP;
46import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_PORT;
47import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_VALUE_SERIALIZER;
Jian Lid1ce10a2018-06-12 13:47:23 +090048import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
Jian Li3ed7f302018-08-27 17:16:27 +090049import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
Jian Lib9fe3492018-06-07 17:19:07 +090050
51/**
52 * Kafka server configuration manager for publishing openstack telemetry.
53 */
Jian Lid1ce10a2018-06-12 13:47:23 +090054@Component(immediate = true)
55@Service
Jian Lib9fe3492018-06-07 17:19:07 +090056public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
57
Jian Li4df75b12018-06-07 22:11:04 +090058 private final Logger log = LoggerFactory.getLogger(getClass());
59
Jian Lid1ce10a2018-06-12 13:47:23 +090060 private static final String ENABLE_SERVICE = "enableService";
Jian Li4df75b12018-06-07 22:11:04 +090061 private static final String ADDRESS = "address";
62 private static final String PORT = "port";
63 private static final String RETRIES = "retries";
64 private static final String REQUIRED_ACKS = "requiredAcks";
65 private static final String BATCH_SIZE = "batchSize";
66 private static final String LINGER_MS = "lingerMs";
67 private static final String MEMORY_BUFFER = "memoryBuffer";
68 private static final String KEY_SERIALIZER = "keySerializer";
69 private static final String VALUE_SERIALIZER = "valueSerializer";
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected ComponentConfigService componentConfigService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
76
77 @Property(name = ADDRESS, value = DEFAULT_KAFKA_SERVER_IP,
78 label = "Default IP address to establish initial connection to Kafka server")
79 protected String address = DEFAULT_KAFKA_SERVER_IP;
80
81 @Property(name = PORT, intValue = DEFAULT_KAFKA_SERVER_PORT,
82 label = "Default port number to establish initial connection to Kafka server")
83 protected Integer port = DEFAULT_KAFKA_SERVER_PORT;
84
85 @Property(name = RETRIES, intValue = DEFAULT_KAFKA_RETRIES,
86 label = "Number of times the producer can retry to send after first failure")
87 protected int retries = DEFAULT_KAFKA_RETRIES;
88
89 @Property(name = REQUIRED_ACKS, value = DEFAULT_KAFKA_REQUIRED_ACKS,
90 label = "Producer will get an acknowledgement after the leader has replicated the data")
91 protected String requiredAcks = DEFAULT_KAFKA_REQUIRED_ACKS;
92
93 @Property(name = BATCH_SIZE, intValue = DEFAULT_KAFKA_BATCH_SIZE,
94 label = "The largest record batch size allowed by Kafka")
95 protected Integer batchSize = DEFAULT_KAFKA_BATCH_SIZE;
96
97 @Property(name = LINGER_MS, intValue = DEFAULT_KAFKA_LINGER_MS,
98 label = "The producer groups together any records that arrive in " +
99 "between request transmissions into a single batched request")
100 protected Integer lingerMs = DEFAULT_KAFKA_LINGER_MS;
101
102 @Property(name = MEMORY_BUFFER, intValue = DEFAULT_KAFKA_MEMORY_BUFFER,
103 label = "The total memory used for log cleaner I/O buffers across all cleaner threads")
104 protected Integer memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
105
106 @Property(name = KEY_SERIALIZER, value = DEFAULT_KAFKA_KEY_SERIALIZER,
107 label = "Serializer class for key that implements the Serializer interface")
108 protected String keySerializer = DEFAULT_KAFKA_KEY_SERIALIZER;
109
110 @Property(name = VALUE_SERIALIZER, value = DEFAULT_KAFKA_VALUE_SERIALIZER,
111 label = "Serializer class for value that implements the Serializer interface")
112 protected String valueSerializer = DEFAULT_KAFKA_VALUE_SERIALIZER;
113
Jian Lid1ce10a2018-06-12 13:47:23 +0900114 @Property(name = ENABLE_SERVICE, boolValue = DEFAULT_DISABLE,
115 label = "Specify the default behavior of telemetry service")
116 protected Boolean enableService = DEFAULT_DISABLE;
117
Jian Li4df75b12018-06-07 22:11:04 +0900118 @Activate
119 protected void activate(ComponentContext context) {
120 componentConfigService.registerProperties(getClass());
Jian Lid1ce10a2018-06-12 13:47:23 +0900121
122 if (enableService) {
123 kafkaTelemetryAdminService.start(getConfig());
124 }
Jian Li4df75b12018-06-07 22:11:04 +0900125 log.info("Started");
126 }
127
128 @Deactivate
129 protected void deactivate() {
130 componentConfigService.unregisterProperties(getClass(), false);
Jian Lid1ce10a2018-06-12 13:47:23 +0900131
132 if (enableService) {
133 kafkaTelemetryAdminService.stop();
134 }
Jian Li4df75b12018-06-07 22:11:04 +0900135 log.info("Stopped");
136 }
137
138 @Modified
139 private void modified(ComponentContext context) {
140 readComponentConfiguration(context);
Jian Li3ed7f302018-08-27 17:16:27 +0900141 initTelemetryService(kafkaTelemetryAdminService, getConfig(), enableService);
Jian Li4df75b12018-06-07 22:11:04 +0900142 log.info("Modified");
143 }
144
Jian Lib9fe3492018-06-07 17:19:07 +0900145 @Override
146 public TelemetryConfig getConfig() {
Jian Li4df75b12018-06-07 22:11:04 +0900147 return new DefaultKafkaTelemetryConfig.DefaultBuilder()
148 .withAddress(address)
149 .withPort(port)
150 .withRetries(retries)
151 .withRequiredAcks(requiredAcks)
152 .withBatchSize(batchSize)
153 .withLingerMs(lingerMs)
154 .withMemoryBuffer(memoryBuffer)
155 .withKeySerializer(keySerializer)
156 .withValueSerializer(valueSerializer)
157 .build();
158 }
159
160 /**
161 * Extracts properties from the component configuration context.
162 *
163 * @param context the component context
164 */
165 private void readComponentConfiguration(ComponentContext context) {
166 Dictionary<?, ?> properties = context.getProperties();
167
168 String addressStr = Tools.get(properties, ADDRESS);
169 address = addressStr != null ? addressStr : DEFAULT_KAFKA_SERVER_IP;
170 log.info("Configured. Kafka server address is {}", address);
171
172 Integer portConfigured = Tools.getIntegerProperty(properties, PORT);
173 if (portConfigured == null) {
174 port = DEFAULT_KAFKA_SERVER_PORT;
175 log.info("Kafka server port is NOT configured, default value is {}", port);
176 } else {
177 port = portConfigured;
178 log.info("Configured. Kafka server port is {}", port);
179 }
180
181 Integer retriesConfigured = Tools.getIntegerProperty(properties, RETRIES);
182 if (retriesConfigured == null) {
183 retries = DEFAULT_KAFKA_RETRIES;
184 log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
185 } else {
186 retries = retriesConfigured;
187 log.info("Configured. Kafka number of retries is {}", retries);
188 }
189
190 String requiredAcksStr = Tools.get(properties, REQUIRED_ACKS);
191 requiredAcks = requiredAcksStr != null ? requiredAcksStr : DEFAULT_KAFKA_REQUIRED_ACKS;
192 log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
193
194 Integer batchSizeConfigured = Tools.getIntegerProperty(properties, BATCH_SIZE);
195 if (batchSizeConfigured == null) {
196 batchSize = DEFAULT_KAFKA_BATCH_SIZE;
197 log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
198 } else {
199 batchSize = batchSizeConfigured;
200 log.info("Configured. Kafka batch size is {}", batchSize);
201 }
202
203 Integer lingerMsConfigured = Tools.getIntegerProperty(properties, LINGER_MS);
204 if (lingerMsConfigured == null) {
205 lingerMs = DEFAULT_KAFKA_LINGER_MS;
206 log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
207 } else {
208 lingerMs = lingerMsConfigured;
209 log.info("Configured. Kafka lingerMs is {}", lingerMs);
210 }
211
212 Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, MEMORY_BUFFER);
213 if (memoryBufferConfigured == null) {
214 memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
215 log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
216 } else {
217 memoryBuffer = memoryBufferConfigured;
218 log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
219 }
220
221 String keySerializerStr = Tools.get(properties, KEY_SERIALIZER);
222 keySerializer = keySerializerStr != null ? keySerializerStr : DEFAULT_KAFKA_KEY_SERIALIZER;
223 log.info("Configured, Kafka key serializer is {}", keySerializer);
224
225 String valueSerializerStr = Tools.get(properties, VALUE_SERIALIZER);
226 valueSerializer = valueSerializerStr != null ? valueSerializerStr : DEFAULT_KAFKA_VALUE_SERIALIZER;
227 log.info("Configured, Kafka value serializer is {}", valueSerializer);
Jian Lid1ce10a2018-06-12 13:47:23 +0900228
229 Boolean enableServiceConfigured =
230 getBooleanProperty(properties, ENABLE_SERVICE);
231 if (enableServiceConfigured == null) {
232 enableService = DEFAULT_DISABLE;
233 log.info("Kafka service enable flag is NOT " +
234 "configured, default value is {}", enableService);
235 } else {
236 enableService = enableServiceConfigured;
237 log.info("Configured. Kafka service enable flag is {}", enableService);
238 }
Jian Lib9fe3492018-06-07 17:19:07 +0900239 }
240}