blob: 242709d64d53340c10080ee273d37e599246c05d [file] [log] [blame]
Jian Lib9fe3492018-06-07 17:19:07 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li4df75b12018-06-07 22:11:04 +090018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Deactivate;
20import org.apache.felix.scr.annotations.Modified;
21import org.apache.felix.scr.annotations.Property;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.onlab.util.Tools;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lib9fe3492018-06-07 17:19:07 +090027import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
28import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Li4df75b12018-06-07 22:11:04 +090029import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
30import org.osgi.service.component.ComponentContext;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import java.util.Dictionary;
35
36import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_BATCH_SIZE;
37import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_KEY_SERIALIZER;
38import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_LINGER_MS;
39import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_MEMORY_BUFFER;
40import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_REQUIRED_ACKS;
41import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_RETRIES;
42import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_IP;
43import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_SERVER_PORT;
44import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_KAFKA_VALUE_SERIALIZER;
Jian Lib9fe3492018-06-07 17:19:07 +090045
46/**
47 * Kafka server configuration manager for publishing openstack telemetry.
48 */
49public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
50
Jian Li4df75b12018-06-07 22:11:04 +090051 private final Logger log = LoggerFactory.getLogger(getClass());
52
53 private static final String ADDRESS = "address";
54 private static final String PORT = "port";
55 private static final String RETRIES = "retries";
56 private static final String REQUIRED_ACKS = "requiredAcks";
57 private static final String BATCH_SIZE = "batchSize";
58 private static final String LINGER_MS = "lingerMs";
59 private static final String MEMORY_BUFFER = "memoryBuffer";
60 private static final String KEY_SERIALIZER = "keySerializer";
61 private static final String VALUE_SERIALIZER = "valueSerializer";
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected ComponentConfigService componentConfigService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
68
69 @Property(name = ADDRESS, value = DEFAULT_KAFKA_SERVER_IP,
70 label = "Default IP address to establish initial connection to Kafka server")
71 protected String address = DEFAULT_KAFKA_SERVER_IP;
72
73 @Property(name = PORT, intValue = DEFAULT_KAFKA_SERVER_PORT,
74 label = "Default port number to establish initial connection to Kafka server")
75 protected Integer port = DEFAULT_KAFKA_SERVER_PORT;
76
77 @Property(name = RETRIES, intValue = DEFAULT_KAFKA_RETRIES,
78 label = "Number of times the producer can retry to send after first failure")
79 protected int retries = DEFAULT_KAFKA_RETRIES;
80
81 @Property(name = REQUIRED_ACKS, value = DEFAULT_KAFKA_REQUIRED_ACKS,
82 label = "Producer will get an acknowledgement after the leader has replicated the data")
83 protected String requiredAcks = DEFAULT_KAFKA_REQUIRED_ACKS;
84
85 @Property(name = BATCH_SIZE, intValue = DEFAULT_KAFKA_BATCH_SIZE,
86 label = "The largest record batch size allowed by Kafka")
87 protected Integer batchSize = DEFAULT_KAFKA_BATCH_SIZE;
88
89 @Property(name = LINGER_MS, intValue = DEFAULT_KAFKA_LINGER_MS,
90 label = "The producer groups together any records that arrive in " +
91 "between request transmissions into a single batched request")
92 protected Integer lingerMs = DEFAULT_KAFKA_LINGER_MS;
93
94 @Property(name = MEMORY_BUFFER, intValue = DEFAULT_KAFKA_MEMORY_BUFFER,
95 label = "The total memory used for log cleaner I/O buffers across all cleaner threads")
96 protected Integer memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
97
98 @Property(name = KEY_SERIALIZER, value = DEFAULT_KAFKA_KEY_SERIALIZER,
99 label = "Serializer class for key that implements the Serializer interface")
100 protected String keySerializer = DEFAULT_KAFKA_KEY_SERIALIZER;
101
102 @Property(name = VALUE_SERIALIZER, value = DEFAULT_KAFKA_VALUE_SERIALIZER,
103 label = "Serializer class for value that implements the Serializer interface")
104 protected String valueSerializer = DEFAULT_KAFKA_VALUE_SERIALIZER;
105
106 @Activate
107 protected void activate(ComponentContext context) {
108 componentConfigService.registerProperties(getClass());
109 kafkaTelemetryAdminService.start(getConfig());
110 log.info("Started");
111 }
112
113 @Deactivate
114 protected void deactivate() {
115 componentConfigService.unregisterProperties(getClass(), false);
116 kafkaTelemetryAdminService.stop();
117 log.info("Stopped");
118 }
119
120 @Modified
121 private void modified(ComponentContext context) {
122 readComponentConfiguration(context);
123 kafkaTelemetryAdminService.restart(getConfig());
124 log.info("Modified");
125 }
126
Jian Lib9fe3492018-06-07 17:19:07 +0900127 @Override
128 public TelemetryConfig getConfig() {
Jian Li4df75b12018-06-07 22:11:04 +0900129 return new DefaultKafkaTelemetryConfig.DefaultBuilder()
130 .withAddress(address)
131 .withPort(port)
132 .withRetries(retries)
133 .withRequiredAcks(requiredAcks)
134 .withBatchSize(batchSize)
135 .withLingerMs(lingerMs)
136 .withMemoryBuffer(memoryBuffer)
137 .withKeySerializer(keySerializer)
138 .withValueSerializer(valueSerializer)
139 .build();
140 }
141
142 /**
143 * Extracts properties from the component configuration context.
144 *
145 * @param context the component context
146 */
147 private void readComponentConfiguration(ComponentContext context) {
148 Dictionary<?, ?> properties = context.getProperties();
149
150 String addressStr = Tools.get(properties, ADDRESS);
151 address = addressStr != null ? addressStr : DEFAULT_KAFKA_SERVER_IP;
152 log.info("Configured. Kafka server address is {}", address);
153
154 Integer portConfigured = Tools.getIntegerProperty(properties, PORT);
155 if (portConfigured == null) {
156 port = DEFAULT_KAFKA_SERVER_PORT;
157 log.info("Kafka server port is NOT configured, default value is {}", port);
158 } else {
159 port = portConfigured;
160 log.info("Configured. Kafka server port is {}", port);
161 }
162
163 Integer retriesConfigured = Tools.getIntegerProperty(properties, RETRIES);
164 if (retriesConfigured == null) {
165 retries = DEFAULT_KAFKA_RETRIES;
166 log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
167 } else {
168 retries = retriesConfigured;
169 log.info("Configured. Kafka number of retries is {}", retries);
170 }
171
172 String requiredAcksStr = Tools.get(properties, REQUIRED_ACKS);
173 requiredAcks = requiredAcksStr != null ? requiredAcksStr : DEFAULT_KAFKA_REQUIRED_ACKS;
174 log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
175
176 Integer batchSizeConfigured = Tools.getIntegerProperty(properties, BATCH_SIZE);
177 if (batchSizeConfigured == null) {
178 batchSize = DEFAULT_KAFKA_BATCH_SIZE;
179 log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
180 } else {
181 batchSize = batchSizeConfigured;
182 log.info("Configured. Kafka batch size is {}", batchSize);
183 }
184
185 Integer lingerMsConfigured = Tools.getIntegerProperty(properties, LINGER_MS);
186 if (lingerMsConfigured == null) {
187 lingerMs = DEFAULT_KAFKA_LINGER_MS;
188 log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
189 } else {
190 lingerMs = lingerMsConfigured;
191 log.info("Configured. Kafka lingerMs is {}", lingerMs);
192 }
193
194 Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, MEMORY_BUFFER);
195 if (memoryBufferConfigured == null) {
196 memoryBuffer = DEFAULT_KAFKA_MEMORY_BUFFER;
197 log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
198 } else {
199 memoryBuffer = memoryBufferConfigured;
200 log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
201 }
202
203 String keySerializerStr = Tools.get(properties, KEY_SERIALIZER);
204 keySerializer = keySerializerStr != null ? keySerializerStr : DEFAULT_KAFKA_KEY_SERIALIZER;
205 log.info("Configured, Kafka key serializer is {}", keySerializer);
206
207 String valueSerializerStr = Tools.get(properties, VALUE_SERIALIZER);
208 valueSerializer = valueSerializerStr != null ? valueSerializerStr : DEFAULT_KAFKA_VALUE_SERIALIZER;
209 log.info("Configured, Kafka value serializer is {}", valueSerializer);
Jian Lib9fe3492018-06-07 17:19:07 +0900210 }
211}