blob: bbd8527f7c3f0925f3cd9c92439acf7e1811469a [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li69600e02018-12-24 13:21:18 +090018import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
Jian Li6803ccd2018-06-08 09:26:09 +090020import org.apache.kafka.clients.producer.KafkaProducer;
21import org.apache.kafka.clients.producer.Producer;
22import org.apache.kafka.clients.producer.ProducerRecord;
23import org.apache.kafka.clients.producer.RecordMetadata;
Jian Li69600e02018-12-24 13:21:18 +090024import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090025import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lid1ce10a2018-06-12 13:47:23 +090026import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li69600e02018-12-24 13:21:18 +090027import org.onosproject.openstacktelemetry.api.TelemetryCodec;
28import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
Jian Li6803ccd2018-06-08 09:26:09 +090029import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
30import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Jian Li69600e02018-12-24 13:21:18 +090039import java.nio.ByteBuffer;
40import java.util.Map;
Jian Li6803ccd2018-06-08 09:26:09 +090041import java.util.Properties;
Jian Li69600e02018-12-24 13:21:18 +090042import java.util.Set;
Jian Li6803ccd2018-06-08 09:26:09 +090043import java.util.concurrent.Future;
44
Jian Li69600e02018-12-24 13:21:18 +090045import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
46import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
Jian Li667c6eb2019-01-07 23:01:12 +090047import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
Jian Li69600e02018-12-24 13:21:18 +090048import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
Jian Li667c6eb2019-01-07 23:01:12 +090049import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
Jian Li69600e02018-12-24 13:21:18 +090050
Jian Li6803ccd2018-06-08 09:26:09 +090051/**
52 * Kafka telemetry manager.
53 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054@Component(immediate = true, service = KafkaTelemetryAdminService.class)
Jian Li6803ccd2018-06-08 09:26:09 +090055public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
56
57 private final Logger log = LoggerFactory.getLogger(getClass());
58
59 private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
60 private static final String RETRIES = "retries";
61 private static final String ACKS = "acks";
62 private static final String BATCH_SIZE = "batch.size";
63 private static final String LINGER_MS = "linger.ms";
64 private static final String MEMORY_BUFFER = "buffer.memory";
65 private static final String KEY_SERIALIZER = "key.serializer";
66 private static final String VALUE_SERIALIZER = "value.serializer";
67
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090069 protected OpenstackTelemetryService openstackTelemetryService;
70
Jian Li69600e02018-12-24 13:21:18 +090071 @Reference(cardinality = ReferenceCardinality.MANDATORY)
72 protected TelemetryConfigService telemetryConfigService;
73
74 private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
Jian Li6803ccd2018-06-08 09:26:09 +090075
76 @Activate
77 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090078
79 openstackTelemetryService.addTelemetryService(this);
80
Jian Li6803ccd2018-06-08 09:26:09 +090081 log.info("Started");
82 }
83
84 @Deactivate
85 protected void deactivate() {
Jian Lia61e0b62018-12-28 19:10:10 +090086 stopAll();
Jian Lid1ce10a2018-06-12 13:47:23 +090087
88 openstackTelemetryService.removeTelemetryService(this);
89
Jian Li6803ccd2018-06-08 09:26:09 +090090 log.info("Stopped");
91 }
92
93 @Override
Jian Li69600e02018-12-24 13:21:18 +090094 public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
Jian Lid1ce10a2018-06-12 13:47:23 +090095
Jian Li69600e02018-12-24 13:21:18 +090096 if (producers == null || producers.isEmpty()) {
Jian Lia4947682018-07-07 14:53:32 +090097 log.debug("Kafka telemetry service has not been enabled!");
Jian Lid1ce10a2018-06-12 13:47:23 +090098 return null;
99 }
100
Jian Li85573f42018-06-27 22:29:14 +0900101 log.debug("Send telemetry record to kafka server...");
Jian Li69600e02018-12-24 13:21:18 +0900102 Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
103 producers.forEach((k, v) -> {
104 TelemetryConfig config = telemetryConfigService.getConfig(k);
Jian Li7fe7eaf2018-12-31 17:00:33 +0900105 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
Jian Li69600e02018-12-24 13:21:18 +0900106
107 try {
Jian Li7fe7eaf2018-12-31 17:00:33 +0900108 Class codecClazz = Class.forName(kafkaConfig.codec());
Jian Li69600e02018-12-24 13:21:18 +0900109 TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
110
111 ByteBuffer buffer = codec.encode(flowInfos);
112 ProducerRecord record = new ProducerRecord<>(
113 kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
114 futureSet.add(v.send(record));
115 } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
116 log.warn("Failed to send telemetry record due to {}", e);
117 }
118 });
119 return futureSet;
Jian Li6803ccd2018-06-08 09:26:09 +0900120 }
Jian Lid1ce10a2018-06-12 13:47:23 +0900121
122 @Override
123 public boolean isRunning() {
Jian Li69600e02018-12-24 13:21:18 +0900124 return !producers.isEmpty();
Jian Lid1ce10a2018-06-12 13:47:23 +0900125 }
Jian Lia61e0b62018-12-28 19:10:10 +0900126
127 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900128 public boolean start(String name) {
129 boolean success = false;
Jian Lia61e0b62018-12-28 19:10:10 +0900130 TelemetryConfig config = telemetryConfigService.getConfig(name);
131 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
132
Jian Li667c6eb2019-01-07 23:01:12 +0900133 if (kafkaConfig != null && !config.name().equals(KAFKA_SCHEME) &&
134 config.status() == ENABLED) {
Jian Lia61e0b62018-12-28 19:10:10 +0900135 StringBuilder kafkaServerBuilder = new StringBuilder();
136 kafkaServerBuilder.append(kafkaConfig.address());
137 kafkaServerBuilder.append(":");
138 kafkaServerBuilder.append(kafkaConfig.port());
139
140 // Configure Kafka server properties
141 Properties prop = new Properties();
142 prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
143 prop.put(RETRIES, kafkaConfig.retries());
144 prop.put(ACKS, kafkaConfig.requiredAcks());
145 prop.put(BATCH_SIZE, kafkaConfig.batchSize());
146 prop.put(LINGER_MS, kafkaConfig.lingerMs());
147 prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
148 prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
149 prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
150
Jian Li667c6eb2019-01-07 23:01:12 +0900151 if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
152 producers.put(name, new KafkaProducer<>(prop));
153 success = true;
154 } else {
155 log.warn("Unable to connect to {}:{}, " +
156 "please check the connectivity manually",
157 kafkaConfig.address(), kafkaConfig.port());
158 }
Jian Lia61e0b62018-12-28 19:10:10 +0900159 }
Jian Li667c6eb2019-01-07 23:01:12 +0900160
161 return success;
Jian Lia61e0b62018-12-28 19:10:10 +0900162 }
163
164 @Override
165 public void stop(String name) {
166 Producer<String, byte[]> producer = producers.get(name);
167
168 if (producer != null) {
169 producer.close();
170 producers.remove(name);
171 }
172 }
173
174 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900175 public boolean restart(String name) {
Jian Lia61e0b62018-12-28 19:10:10 +0900176 stop(name);
Jian Li667c6eb2019-01-07 23:01:12 +0900177 return start(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900178 }
179
180 @Override
181 public void startAll() {
182 telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> start(c.name()));
183 log.info("Kafka producer has Started");
184 }
185
186 @Override
187 public void stopAll() {
188 if (!producers.isEmpty()) {
189 producers.values().forEach(Producer::close);
190 }
191
192 producers.clear();
193
194 log.info("Kafka producer has Stopped");
195 }
196
197 @Override
198 public void restartAll() {
199 stopAll();
200 startAll();
201 }
Jian Li6803ccd2018-06-08 09:26:09 +0900202}