blob: 6621e0bbc917fc040f31e8756c7f6c203c4b337c [file] [log] [blame]
Jian Lib9fe3492018-06-07 17:19:07 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li4df75b12018-06-07 22:11:04 +090018import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lib9fe3492018-06-07 17:19:07 +090021import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
22import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Li4df75b12018-06-07 22:11:04 +090023import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
24import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070025import org.osgi.service.component.annotations.Activate;
26import org.osgi.service.component.annotations.Component;
27import org.osgi.service.component.annotations.Deactivate;
28import org.osgi.service.component.annotations.Modified;
29import org.osgi.service.component.annotations.Reference;
30import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li4df75b12018-06-07 22:11:04 +090031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import java.util.Dictionary;
35
Ray Milkey8e406512018-10-24 15:56:50 -070036import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS;
37import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS_DEFAULT;
38import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE;
39import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE_DEFAULT;
40import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE;
41import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
42import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER;
43import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
44import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS;
45import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS_DEFAULT;
46import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER;
47import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
48import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT;
49import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT_DEFAULT;
50import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS;
51import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
52import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES;
53import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES_DEFAULT;
54import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER;
55import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
Jian Lid1ce10a2018-06-12 13:47:23 +090056import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
Jian Li3ed7f302018-08-27 17:16:27 +090057import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
Jian Lib9fe3492018-06-07 17:19:07 +090058
59/**
60 * Kafka server configuration manager for publishing openstack telemetry.
61 */
Ray Milkey8e406512018-10-24 15:56:50 -070062@Component(
63 immediate = true,
64 service = KafkaTelemetryConfigService.class,
65 property = {
66 PROP_KAFKA_ADDRESS + "=" + PROP_KAFKA_ADDRESS_DEFAULT,
67 PROP_KAFKA_PORT + ":Integer=" + PROP_KAFKA_PORT_DEFAULT,
68 PROP_KAFKA_RETRIES + ":Integer=" + PROP_KAFKA_RETRIES_DEFAULT,
69 PROP_KAFKA_REQUIRED_ACKS + "=" + PROP_KAFKA_REQUIRED_ACKS_DEFAULT,
70 PROP_KAFKA_BATCH_SIZE + ":Integer=" + PROP_KAFKA_BATCH_SIZE_DEFAULT,
71 PROP_KAFKA_LINGER_MS + ":Integer=" + PROP_KAFKA_LINGER_MS_DEFAULT,
72 PROP_KAFKA_MEMORY_BUFFER + ":Integer=" + PROP_KAFKA_MEMORY_BUFFER_DEFAULT,
73 PROP_KAFKA_KEY_SERIALIZER + "=" + PROP_KAFKA_KEY_SERIALIZER_DEFAULT,
74 PROP_KAFKA_VALUE_SERIALIZER + "=" + PROP_KAFKA_VALUE_SERIALIZER_DEFAULT,
75 PROP_KAFKA_ENABLE_SERVICE + ":Boolean=" + PROP_KAFKA_ENABLE_SERVICE_DEFAULT
76 }
77)
Jian Lib9fe3492018-06-07 17:19:07 +090078public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
79
Jian Li4df75b12018-06-07 22:11:04 +090080 private final Logger log = LoggerFactory.getLogger(getClass());
81
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4df75b12018-06-07 22:11:04 +090083 protected ComponentConfigService componentConfigService;
84
Ray Milkeyd84f89b2018-08-17 14:54:17 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4df75b12018-06-07 22:11:04 +090086 protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
87
Ray Milkey8e406512018-10-24 15:56:50 -070088 /** Default IP address to establish initial connection to Kafka server. */
89 protected String address = PROP_KAFKA_ADDRESS_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +090090
Ray Milkey8e406512018-10-24 15:56:50 -070091 /** Default port number to establish initial connection to Kafka server. */
92 protected Integer port = PROP_KAFKA_PORT_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +090093
Ray Milkey8e406512018-10-24 15:56:50 -070094 /** Number of times the producer can retry to send after first failure. */
95 protected int retries = PROP_KAFKA_RETRIES_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +090096
Ray Milkey8e406512018-10-24 15:56:50 -070097 /** Producer will get an acknowledgement after the leader has replicated the data. */
98 protected String requiredAcks = PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +090099
Ray Milkey8e406512018-10-24 15:56:50 -0700100 /** The largest record batch size allowed by Kafka. */
101 protected Integer batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900102
Ray Milkey8e406512018-10-24 15:56:50 -0700103 /** The producer groups together any records that arrive between request transmissions into a single batch. */
104 protected Integer lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900105
Ray Milkey8e406512018-10-24 15:56:50 -0700106 /** The total memory used for log cleaner I/O buffers across all cleaner threads. */
107 protected Integer memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900108
Ray Milkey8e406512018-10-24 15:56:50 -0700109 /** Serializer class for key that implements the Serializer interface. */
110 protected String keySerializer = PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900111
Ray Milkey8e406512018-10-24 15:56:50 -0700112 /** Serializer class for value that implements the Serializer interface. */
113 protected String valueSerializer = PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900114
Ray Milkey8e406512018-10-24 15:56:50 -0700115 /** Specify the default behavior of telemetry service. */
116 protected Boolean enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
Jian Lid1ce10a2018-06-12 13:47:23 +0900117
Jian Li4df75b12018-06-07 22:11:04 +0900118 @Activate
119 protected void activate(ComponentContext context) {
120 componentConfigService.registerProperties(getClass());
Jian Lid1ce10a2018-06-12 13:47:23 +0900121
122 if (enableService) {
123 kafkaTelemetryAdminService.start(getConfig());
124 }
Jian Li4df75b12018-06-07 22:11:04 +0900125 log.info("Started");
126 }
127
128 @Deactivate
129 protected void deactivate() {
130 componentConfigService.unregisterProperties(getClass(), false);
Jian Lid1ce10a2018-06-12 13:47:23 +0900131
132 if (enableService) {
133 kafkaTelemetryAdminService.stop();
134 }
Jian Li4df75b12018-06-07 22:11:04 +0900135 log.info("Stopped");
136 }
137
138 @Modified
139 private void modified(ComponentContext context) {
140 readComponentConfiguration(context);
Jian Li3ed7f302018-08-27 17:16:27 +0900141 initTelemetryService(kafkaTelemetryAdminService, getConfig(), enableService);
Jian Li4df75b12018-06-07 22:11:04 +0900142 log.info("Modified");
143 }
144
Jian Lib9fe3492018-06-07 17:19:07 +0900145 @Override
146 public TelemetryConfig getConfig() {
Jian Li4df75b12018-06-07 22:11:04 +0900147 return new DefaultKafkaTelemetryConfig.DefaultBuilder()
148 .withAddress(address)
149 .withPort(port)
150 .withRetries(retries)
151 .withRequiredAcks(requiredAcks)
152 .withBatchSize(batchSize)
153 .withLingerMs(lingerMs)
154 .withMemoryBuffer(memoryBuffer)
155 .withKeySerializer(keySerializer)
156 .withValueSerializer(valueSerializer)
157 .build();
158 }
159
160 /**
161 * Extracts properties from the component configuration context.
162 *
163 * @param context the component context
164 */
165 private void readComponentConfiguration(ComponentContext context) {
166 Dictionary<?, ?> properties = context.getProperties();
167
Ray Milkey8e406512018-10-24 15:56:50 -0700168 String addressStr = Tools.get(properties, PROP_KAFKA_ADDRESS);
169 address = addressStr != null ? addressStr : PROP_KAFKA_ADDRESS_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900170 log.info("Configured. Kafka server address is {}", address);
171
Ray Milkey8e406512018-10-24 15:56:50 -0700172 Integer portConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_PORT);
Jian Li4df75b12018-06-07 22:11:04 +0900173 if (portConfigured == null) {
Ray Milkey8e406512018-10-24 15:56:50 -0700174 port = PROP_KAFKA_PORT_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900175 log.info("Kafka server port is NOT configured, default value is {}", port);
176 } else {
177 port = portConfigured;
178 log.info("Configured. Kafka server port is {}", port);
179 }
180
Ray Milkey8e406512018-10-24 15:56:50 -0700181 Integer retriesConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_RETRIES);
Jian Li4df75b12018-06-07 22:11:04 +0900182 if (retriesConfigured == null) {
Ray Milkey8e406512018-10-24 15:56:50 -0700183 retries = PROP_KAFKA_RETRIES_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900184 log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
185 } else {
186 retries = retriesConfigured;
187 log.info("Configured. Kafka number of retries is {}", retries);
188 }
189
Ray Milkey8e406512018-10-24 15:56:50 -0700190 String requiredAcksStr = Tools.get(properties, PROP_KAFKA_REQUIRED_ACKS);
191 requiredAcks = requiredAcksStr != null ? requiredAcksStr : PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900192 log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
193
Ray Milkey8e406512018-10-24 15:56:50 -0700194 Integer batchSizeConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_BATCH_SIZE);
Jian Li4df75b12018-06-07 22:11:04 +0900195 if (batchSizeConfigured == null) {
Ray Milkey8e406512018-10-24 15:56:50 -0700196 batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900197 log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
198 } else {
199 batchSize = batchSizeConfigured;
200 log.info("Configured. Kafka batch size is {}", batchSize);
201 }
202
Ray Milkey8e406512018-10-24 15:56:50 -0700203 Integer lingerMsConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_LINGER_MS);
Jian Li4df75b12018-06-07 22:11:04 +0900204 if (lingerMsConfigured == null) {
Ray Milkey8e406512018-10-24 15:56:50 -0700205 lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900206 log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
207 } else {
208 lingerMs = lingerMsConfigured;
209 log.info("Configured. Kafka lingerMs is {}", lingerMs);
210 }
211
Ray Milkey8e406512018-10-24 15:56:50 -0700212 Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_MEMORY_BUFFER);
Jian Li4df75b12018-06-07 22:11:04 +0900213 if (memoryBufferConfigured == null) {
Ray Milkey8e406512018-10-24 15:56:50 -0700214 memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900215 log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
216 } else {
217 memoryBuffer = memoryBufferConfigured;
218 log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
219 }
220
Ray Milkey8e406512018-10-24 15:56:50 -0700221 String keySerializerStr = Tools.get(properties, PROP_KAFKA_KEY_SERIALIZER);
222 keySerializer = keySerializerStr != null ? keySerializerStr : PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900223 log.info("Configured, Kafka key serializer is {}", keySerializer);
224
Ray Milkey8e406512018-10-24 15:56:50 -0700225 String valueSerializerStr = Tools.get(properties, PROP_KAFKA_VALUE_SERIALIZER);
226 valueSerializer = valueSerializerStr != null ? valueSerializerStr : PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
Jian Li4df75b12018-06-07 22:11:04 +0900227 log.info("Configured, Kafka value serializer is {}", valueSerializer);
Jian Lid1ce10a2018-06-12 13:47:23 +0900228
229 Boolean enableServiceConfigured =
Ray Milkey8e406512018-10-24 15:56:50 -0700230 getBooleanProperty(properties, PROP_KAFKA_ENABLE_SERVICE);
Jian Lid1ce10a2018-06-12 13:47:23 +0900231 if (enableServiceConfigured == null) {
Ray Milkey8e406512018-10-24 15:56:50 -0700232 enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
Jian Lid1ce10a2018-06-12 13:47:23 +0900233 log.info("Kafka service enable flag is NOT " +
234 "configured, default value is {}", enableService);
235 } else {
236 enableService = enableServiceConfigured;
237 log.info("Configured. Kafka service enable flag is {}", enableService);
238 }
Jian Lib9fe3492018-06-07 17:19:07 +0900239 }
240}