blob: 1212b0b108f2259fc351c81762dafb1a692bb0b3 [file] [log] [blame]
Jian Lib9fe3492018-06-07 17:19:07 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li4df75b12018-06-07 22:11:04 +090018import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lib9fe3492018-06-07 17:19:07 +090021import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
22import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Li4df75b12018-06-07 22:11:04 +090023import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
24import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070025import org.osgi.service.component.annotations.Activate;
26import org.osgi.service.component.annotations.Component;
27import org.osgi.service.component.annotations.Deactivate;
28import org.osgi.service.component.annotations.Modified;
29import org.osgi.service.component.annotations.Reference;
30import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li4df75b12018-06-07 22:11:04 +090031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import java.util.Dictionary;
35
Jian Lid1ce10a2018-06-12 13:47:23 +090036import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DISABLE;
Jian Li4df75b12018-06-07 22:11:04 +090037import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_BATCH_SIZE;
38import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_KEY_SERIALIZER;
39import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_LINGER_MS;
40import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_MEMORY_BUFFER;
41import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_REQUIRED_ACKS;
42import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_RETRIES;
43import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_IP;
44import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_PORT;
45import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_VALUE_SERIALIZER;
Jian Lid1ce10a2018-06-12 13:47:23 +090046import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
Jian Li3ed7f302018-08-27 17:16:27 +090047import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
Jian Lib9fe3492018-06-07 17:19:07 +090048
49/**
50 * Kafka server configuration manager for publishing openstack telemetry.
51 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070052@Component(immediate = true, service = KafkaTelemetryConfigService.class)
Jian Lib9fe3492018-06-07 17:19:07 +090053public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
54
Jian Li4df75b12018-06-07 22:11:04 +090055 private final Logger log = LoggerFactory.getLogger(getClass());
56
Jian Lid1ce10a2018-06-12 13:47:23 +090057 private static final String ENABLE_SERVICE = "enableService";
Jian Li4df75b12018-06-07 22:11:04 +090058 private static final String ADDRESS = "address";
59 private static final String PORT = "port";
60 private static final String RETRIES = "retries";
61 private static final String REQUIRED_ACKS = "requiredAcks";
62 private static final String BATCH_SIZE = "batchSize";
63 private static final String LINGER_MS = "lingerMs";
64 private static final String MEMORY_BUFFER = "memoryBuffer";
65 private static final String KEY_SERIALIZER = "keySerializer";
66 private static final String VALUE_SERIALIZER = "valueSerializer";
67
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4df75b12018-06-07 22:11:04 +090069 protected ComponentConfigService componentConfigService;
70
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4df75b12018-06-07 22:11:04 +090072 protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
73
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074 //@Property(name = ADDRESS, value = DEFAULT_KAFKA_SERVER_IP,
75 // label = "Default IP address to establish initial connection to Kafka server")
Jian Li4df75b12018-06-07 22:11:04 +090076 protected String address = DEFAULT_KAFKA_SERVER_IP;
77
Ray Milkeyd84f89b2018-08-17 14:54:17 -070078 //@Property(name = PORT, intValue = DEFAULT_KAFKA_SERVER_PORT,
79 // label = "Default port number to establish initial connection to Kafka server")
Jian Li4df75b12018-06-07 22:11:04 +090080 protected Integer port = DEFAULT_KAFKA_SERVER_PORT;
81
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082 //@Property(name = RETRIES, intValue = DEFAULT_KAFKA_RETRIES,
83 // label = "Number of times the producer can retry to send after first failure")
Jian Li4df75b12018-06-07 22:11:04 +090084 protected int retries = DEFAULT_KAFKA_RETRIES;
85
Ray Milkeyd84f89b2018-08-17 14:54:17 -070086 //@Property(name = REQUIRED_ACKS, value = DEFAULT_KAFKA_REQUIRED_ACKS,
87 // label = "Producer will get an acknowledgement after the leader has replicated the data")
Jian Li4df75b12018-06-07 22:11:04 +090088 protected String requiredAcks = DEFAULT_KAFKA_REQUIRED_ACKS;
89
Ray Milkeyd84f89b2018-08-17 14:54:17 -070090 //@Property(name = BATCH_SIZE, intValue = DEFAULT_KAFKA_BATCH_SIZE,
91 // label = "The largest record batch size allowed by Kafka")
Jian Li4df75b12018-06-07 22:11:04 +090092 protected Integer batchSize = DEFAULT_KAFKA_BATCH_SIZE;
93
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094 //@Property(name = LINGER_MS, intValue = DEFAULT_KAFKA_LINGER_MS,
95 // label = "The producer groups together any records that arrive in " +
96 // "between request transmissions into a single batched request")
Jian Li4df75b12018-06-07 22:11:04 +090097 protected Integer lingerMs = DEFAULT_KAFKA_LINGER_MS;
98
Ray Milkeyd84f89b2018-08-17 14:54:17 -070099 //@Property(name = MEMORY_BUFFER, intValue = DEFAULT_KAFKA_MEMORY_BUFFER,
100 // label = "The total memory used for log cleaner I/O buffers across all cleaner threads")
Jian Li4df75b12018-06-07 22:11:04 +0900101 protected Integer memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
102
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700103 //@Property(name = KEY_SERIALIZER, value = DEFAULT_KAFKA_KEY_SERIALIZER,
104 // label = "Serializer class for key that implements the Serializer interface")
Jian Li4df75b12018-06-07 22:11:04 +0900105 protected String keySerializer = DEFAULT_KAFKA_KEY_SERIALIZER;
106
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700107 //@Property(name = VALUE_SERIALIZER, value = DEFAULT_KAFKA_VALUE_SERIALIZER,
108 // label = "Serializer class for value that implements the Serializer interface")
Jian Li4df75b12018-06-07 22:11:04 +0900109 protected String valueSerializer = DEFAULT_KAFKA_VALUE_SERIALIZER;
110
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700111 //@Property(name = ENABLE_SERVICE, boolValue = DEFAULT_DISABLE,
112 // label = "Specify the default behavior of telemetry service")
Jian Lid1ce10a2018-06-12 13:47:23 +0900113 protected Boolean enableService = DEFAULT_DISABLE;
114
Jian Li4df75b12018-06-07 22:11:04 +0900115 @Activate
116 protected void activate(ComponentContext context) {
117 componentConfigService.registerProperties(getClass());
Jian Lid1ce10a2018-06-12 13:47:23 +0900118
119 if (enableService) {
120 kafkaTelemetryAdminService.start(getConfig());
121 }
Jian Li4df75b12018-06-07 22:11:04 +0900122 log.info("Started");
123 }
124
125 @Deactivate
126 protected void deactivate() {
127 componentConfigService.unregisterProperties(getClass(), false);
Jian Lid1ce10a2018-06-12 13:47:23 +0900128
129 if (enableService) {
130 kafkaTelemetryAdminService.stop();
131 }
Jian Li4df75b12018-06-07 22:11:04 +0900132 log.info("Stopped");
133 }
134
135 @Modified
136 private void modified(ComponentContext context) {
137 readComponentConfiguration(context);
Jian Li3ed7f302018-08-27 17:16:27 +0900138 initTelemetryService(kafkaTelemetryAdminService, getConfig(), enableService);
Jian Li4df75b12018-06-07 22:11:04 +0900139 log.info("Modified");
140 }
141
Jian Lib9fe3492018-06-07 17:19:07 +0900142 @Override
143 public TelemetryConfig getConfig() {
Jian Li4df75b12018-06-07 22:11:04 +0900144 return new DefaultKafkaTelemetryConfig.DefaultBuilder()
145 .withAddress(address)
146 .withPort(port)
147 .withRetries(retries)
148 .withRequiredAcks(requiredAcks)
149 .withBatchSize(batchSize)
150 .withLingerMs(lingerMs)
151 .withMemoryBuffer(memoryBuffer)
152 .withKeySerializer(keySerializer)
153 .withValueSerializer(valueSerializer)
154 .build();
155 }
156
157 /**
158 * Extracts properties from the component configuration context.
159 *
160 * @param context the component context
161 */
162 private void readComponentConfiguration(ComponentContext context) {
163 Dictionary<?, ?> properties = context.getProperties();
164
165 String addressStr = Tools.get(properties, ADDRESS);
166 address = addressStr != null ? addressStr : DEFAULT_KAFKA_SERVER_IP;
167 log.info("Configured. Kafka server address is {}", address);
168
169 Integer portConfigured = Tools.getIntegerProperty(properties, PORT);
170 if (portConfigured == null) {
171 port = DEFAULT_KAFKA_SERVER_PORT;
172 log.info("Kafka server port is NOT configured, default value is {}", port);
173 } else {
174 port = portConfigured;
175 log.info("Configured. Kafka server port is {}", port);
176 }
177
178 Integer retriesConfigured = Tools.getIntegerProperty(properties, RETRIES);
179 if (retriesConfigured == null) {
180 retries = DEFAULT_KAFKA_RETRIES;
181 log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
182 } else {
183 retries = retriesConfigured;
184 log.info("Configured. Kafka number of retries is {}", retries);
185 }
186
187 String requiredAcksStr = Tools.get(properties, REQUIRED_ACKS);
188 requiredAcks = requiredAcksStr != null ? requiredAcksStr : DEFAULT_KAFKA_REQUIRED_ACKS;
189 log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
190
191 Integer batchSizeConfigured = Tools.getIntegerProperty(properties, BATCH_SIZE);
192 if (batchSizeConfigured == null) {
193 batchSize = DEFAULT_KAFKA_BATCH_SIZE;
194 log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
195 } else {
196 batchSize = batchSizeConfigured;
197 log.info("Configured. Kafka batch size is {}", batchSize);
198 }
199
200 Integer lingerMsConfigured = Tools.getIntegerProperty(properties, LINGER_MS);
201 if (lingerMsConfigured == null) {
202 lingerMs = DEFAULT_KAFKA_LINGER_MS;
203 log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
204 } else {
205 lingerMs = lingerMsConfigured;
206 log.info("Configured. Kafka lingerMs is {}", lingerMs);
207 }
208
209 Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, MEMORY_BUFFER);
210 if (memoryBufferConfigured == null) {
211 memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
212 log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
213 } else {
214 memoryBuffer = memoryBufferConfigured;
215 log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
216 }
217
218 String keySerializerStr = Tools.get(properties, KEY_SERIALIZER);
219 keySerializer = keySerializerStr != null ? keySerializerStr : DEFAULT_KAFKA_KEY_SERIALIZER;
220 log.info("Configured, Kafka key serializer is {}", keySerializer);
221
222 String valueSerializerStr = Tools.get(properties, VALUE_SERIALIZER);
223 valueSerializer = valueSerializerStr != null ? valueSerializerStr : DEFAULT_KAFKA_VALUE_SERIALIZER;
224 log.info("Configured, Kafka value serializer is {}", valueSerializer);
Jian Lid1ce10a2018-06-12 13:47:23 +0900225
226 Boolean enableServiceConfigured =
227 getBooleanProperty(properties, ENABLE_SERVICE);
228 if (enableServiceConfigured == null) {
229 enableService = DEFAULT_DISABLE;
230 log.info("Kafka service enable flag is NOT " +
231 "configured, default value is {}", enableService);
232 } else {
233 enableService = enableServiceConfigured;
234 log.info("Configured. Kafka service enable flag is {}", enableService);
235 }
Jian Lib9fe3492018-06-07 17:19:07 +0900236 }
237}