blob: b79d7037ee72276b5102da91203894f52ec9b842 [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Service;
22import org.influxdb.InfluxDB;
23import org.influxdb.InfluxDBFactory;
24import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
25import org.onosproject.openstacktelemetry.api.InfluxRecord;
26import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
27import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31/**
32 * InfluxDB telemetry manager.
33 */
34@Component(immediate = true)
35@Service
36public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
37
38 private final Logger log = LoggerFactory.getLogger(getClass());
39
40 private static final String PROTOCOL = "http";
41 private InfluxDB producer = null;
42
43 @Activate
44 protected void activate() {
45 log.info("Started");
46 }
47
48 @Deactivate
49 protected void deactivate() {
50 stop();
51 log.info("Stopped");
52 }
53
54 @Override
55 public void start(TelemetryConfig config) {
56 if (producer != null) {
57 log.info("InfluxDB producer has already been started");
58 return;
59 }
60
61 InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
62
63 StringBuilder influxDbServerBuilder = new StringBuilder();
64 influxDbServerBuilder.append(PROTOCOL);
65 influxDbServerBuilder.append(":");
66 influxDbServerBuilder.append("//");
67 influxDbServerBuilder.append(influxDbConfig.address());
68 influxDbServerBuilder.append(":");
69 influxDbServerBuilder.append(influxDbConfig.port());
70
71 producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
72 influxDbConfig.username(), influxDbConfig.password());
73 log.info("InfluxDB producer has Started");
74 }
75
76 @Override
77 public void stop() {
78 if (producer != null) {
79 producer = null;
80 }
81
82 log.info("Kafka producer has Stopped");
83 }
84
85 @Override
86 public void restart(TelemetryConfig config) {
87 stop();
88 start(config);
89 }
90
91 @Override
92 public void publish(InfluxRecord<String, Object> record) {
93 // TODO: need to find a way to invoke InfluxDB endpoint using producer
94 }
95}