blob: 85f36e3a359d7b5004603bb33528f61fba21612e [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li69600e02018-12-24 13:21:18 +090018import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
Jian Li6803ccd2018-06-08 09:26:09 +090020import org.apache.kafka.clients.producer.KafkaProducer;
21import org.apache.kafka.clients.producer.Producer;
22import org.apache.kafka.clients.producer.ProducerRecord;
23import org.apache.kafka.clients.producer.RecordMetadata;
Jian Li69600e02018-12-24 13:21:18 +090024import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090025import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lid1ce10a2018-06-12 13:47:23 +090026import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li69600e02018-12-24 13:21:18 +090027import org.onosproject.openstacktelemetry.api.TelemetryCodec;
28import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
Jian Li6803ccd2018-06-08 09:26:09 +090029import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
30import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Jian Li69600e02018-12-24 13:21:18 +090039import java.nio.ByteBuffer;
40import java.util.Map;
Jian Li6803ccd2018-06-08 09:26:09 +090041import java.util.Properties;
Jian Li69600e02018-12-24 13:21:18 +090042import java.util.Set;
Jian Li6803ccd2018-06-08 09:26:09 +090043import java.util.concurrent.Future;
44
Jian Li69600e02018-12-24 13:21:18 +090045import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
46import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
47import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
48
Jian Li6803ccd2018-06-08 09:26:09 +090049/**
50 * Kafka telemetry manager.
51 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070052@Component(immediate = true, service = KafkaTelemetryAdminService.class)
Jian Li6803ccd2018-06-08 09:26:09 +090053public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
54
55 private final Logger log = LoggerFactory.getLogger(getClass());
56
Jian Li69600e02018-12-24 13:21:18 +090057 private static final String CODEC_PREFIX = "org.onosproject.openstacktelemetry.codec.";
58
Jian Li6803ccd2018-06-08 09:26:09 +090059 private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
60 private static final String RETRIES = "retries";
61 private static final String ACKS = "acks";
62 private static final String BATCH_SIZE = "batch.size";
63 private static final String LINGER_MS = "linger.ms";
64 private static final String MEMORY_BUFFER = "buffer.memory";
65 private static final String KEY_SERIALIZER = "key.serializer";
66 private static final String VALUE_SERIALIZER = "value.serializer";
67
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090069 protected OpenstackTelemetryService openstackTelemetryService;
70
Jian Li69600e02018-12-24 13:21:18 +090071 @Reference(cardinality = ReferenceCardinality.MANDATORY)
72 protected TelemetryConfigService telemetryConfigService;
73
74 private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
Jian Li6803ccd2018-06-08 09:26:09 +090075
76 @Activate
77 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090078
79 openstackTelemetryService.addTelemetryService(this);
80
Jian Li6803ccd2018-06-08 09:26:09 +090081 log.info("Started");
82 }
83
84 @Deactivate
85 protected void deactivate() {
86 stop();
Jian Lid1ce10a2018-06-12 13:47:23 +090087
88 openstackTelemetryService.removeTelemetryService(this);
89
Jian Li6803ccd2018-06-08 09:26:09 +090090 log.info("Stopped");
91 }
92
93 @Override
Jian Li69600e02018-12-24 13:21:18 +090094 public void start() {
95 telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
96 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
Jian Li6803ccd2018-06-08 09:26:09 +090097
Jian Li69600e02018-12-24 13:21:18 +090098 if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
99 StringBuilder kafkaServerBuilder = new StringBuilder();
100 kafkaServerBuilder.append(kafkaConfig.address());
101 kafkaServerBuilder.append(":");
102 kafkaServerBuilder.append(kafkaConfig.port());
Jian Li6803ccd2018-06-08 09:26:09 +0900103
Jian Li69600e02018-12-24 13:21:18 +0900104 // Configure Kafka server properties
105 Properties prop = new Properties();
106 prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
107 prop.put(RETRIES, kafkaConfig.retries());
108 prop.put(ACKS, kafkaConfig.requiredAcks());
109 prop.put(BATCH_SIZE, kafkaConfig.batchSize());
110 prop.put(LINGER_MS, kafkaConfig.lingerMs());
111 prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
112 prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
113 prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
Jian Li6803ccd2018-06-08 09:26:09 +0900114
Jian Li69600e02018-12-24 13:21:18 +0900115 producers.put(c.name(), new KafkaProducer<>(prop));
116 }
117 });
Jian Li6803ccd2018-06-08 09:26:09 +0900118
Jian Li6803ccd2018-06-08 09:26:09 +0900119 log.info("Kafka producer has Started");
120 }
121
122 @Override
123 public void stop() {
Jian Li69600e02018-12-24 13:21:18 +0900124 if (!producers.isEmpty()) {
125 producers.values().forEach(Producer::close);
Jian Li6803ccd2018-06-08 09:26:09 +0900126 }
127
Jian Li69600e02018-12-24 13:21:18 +0900128 producers.clear();
129
Jian Li6803ccd2018-06-08 09:26:09 +0900130 log.info("Kafka producer has Stopped");
131 }
132
133 @Override
Jian Li69600e02018-12-24 13:21:18 +0900134 public void restart() {
Jian Li6803ccd2018-06-08 09:26:09 +0900135 stop();
Jian Li69600e02018-12-24 13:21:18 +0900136 start();
Jian Li6803ccd2018-06-08 09:26:09 +0900137 }
138
139 @Override
Jian Li69600e02018-12-24 13:21:18 +0900140 public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900141
Jian Li69600e02018-12-24 13:21:18 +0900142 if (producers == null || producers.isEmpty()) {
Jian Lia4947682018-07-07 14:53:32 +0900143 log.debug("Kafka telemetry service has not been enabled!");
Jian Lid1ce10a2018-06-12 13:47:23 +0900144 return null;
145 }
146
Jian Li85573f42018-06-27 22:29:14 +0900147 log.debug("Send telemetry record to kafka server...");
Jian Li69600e02018-12-24 13:21:18 +0900148 Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
149 producers.forEach((k, v) -> {
150 TelemetryConfig config = telemetryConfigService.getConfig(k);
151 KafkaTelemetryConfig kafkaConfig =
152 fromTelemetryConfig(config);
153
154 try {
155 Class codecClazz = Class.forName(CODEC_PREFIX + kafkaConfig.codec());
156 TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
157
158 ByteBuffer buffer = codec.encode(flowInfos);
159 ProducerRecord record = new ProducerRecord<>(
160 kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
161 futureSet.add(v.send(record));
162 } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
163 log.warn("Failed to send telemetry record due to {}", e);
164 }
165 });
166 return futureSet;
Jian Li6803ccd2018-06-08 09:26:09 +0900167 }
Jian Lid1ce10a2018-06-12 13:47:23 +0900168
169 @Override
170 public boolean isRunning() {
Jian Li69600e02018-12-24 13:21:18 +0900171 return !producers.isEmpty();
Jian Lid1ce10a2018-06-12 13:47:23 +0900172 }
Jian Li6803ccd2018-06-08 09:26:09 +0900173}