blob: b7c3985a20626d085f787222e55d170b6bb2a2dc [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li16f1f532018-12-27 17:45:09 +090018import com.google.common.collect.Maps;
Jian Li6803ccd2018-06-08 09:26:09 +090019import org.influxdb.InfluxDB;
20import org.influxdb.InfluxDBFactory;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090021import org.influxdb.dto.BatchPoints;
22import org.influxdb.dto.Point;
23import org.onlab.packet.TpPort;
24import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090025import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
26import org.onosproject.openstacktelemetry.api.InfluxRecord;
Jian Lid1ce10a2018-06-12 13:47:23 +090027import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li69600e02018-12-24 13:21:18 +090028import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
Jian Li6803ccd2018-06-08 09:26:09 +090029import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
30import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Jian Li69600e02018-12-24 13:21:18 +090039import java.util.Map;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090040import java.util.Set;
41
Jian Li69600e02018-12-24 13:21:18 +090042import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
43import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
Jian Li667c6eb2019-01-07 23:01:12 +090044import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
Jian Li69600e02018-12-24 13:21:18 +090045import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
Jian Li667c6eb2019-01-07 23:01:12 +090046import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
Jian Li69600e02018-12-24 13:21:18 +090047
Jian Li6803ccd2018-06-08 09:26:09 +090048/**
49 * InfluxDB telemetry manager.
50 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070051@Component(immediate = true, service = InfluxDbTelemetryAdminService.class)
Jian Li6803ccd2018-06-08 09:26:09 +090052public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
53
54 private final Logger log = LoggerFactory.getLogger(getClass());
55
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090056 private static final String FLOW_TYPE = "flowType";
57 private static final String DEVICE_ID = "deviceId";
58 private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
59 private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
60
61 private static final String VLAN_ID = "vlanId";
62 private static final String VXLAN_ID = "vxlanId";
63 private static final String SRC_IP = "srcIp";
64 private static final String DST_IP = "dstIp";
65 private static final String SRC_PORT = "srcPort";
66 private static final String DST_PORT = "dstPort";
67 private static final String PROTOCOL = "protocol";
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090068
69 private static final String STARTUP_TIME = "startupTime";
70 private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
71 private static final String LST_PKT_OFFSET = "lstPktOffset";
72 private static final String PREV_ACC_BYTES = "prevAccBytes";
73 private static final String PREV_ACC_PKTS = "prevAccPkts";
74 private static final String CURR_ACC_BYTES = "currAccBytes";
75 private static final String CURR_ACC_PKTS = "currAccPkts";
76 private static final String ERROR_PKTS = "errorPkts";
77 private static final String DROP_PKTS = "dropPkts";
78
Ray Milkeyd84f89b2018-08-17 14:54:17 -070079 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090080 protected OpenstackTelemetryService openstackTelemetryService;
81
Jian Li69600e02018-12-24 13:21:18 +090082 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected TelemetryConfigService telemetryConfigService;
84
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090085 private static final String INFLUX_PROTOCOL = "http";
Jian Li16f1f532018-12-27 17:45:09 +090086 private Map<String, InfluxDB> producers = Maps.newConcurrentMap();
Jian Li6803ccd2018-06-08 09:26:09 +090087
88 @Activate
89 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090090
91 openstackTelemetryService.addTelemetryService(this);
92
Jian Li6803ccd2018-06-08 09:26:09 +090093 log.info("Started");
94 }
95
96 @Deactivate
97 protected void deactivate() {
Jian Lia61e0b62018-12-28 19:10:10 +090098 stopAll();
Jian Lid1ce10a2018-06-12 13:47:23 +090099
100 openstackTelemetryService.removeTelemetryService(this);
101
Jian Li6803ccd2018-06-08 09:26:09 +0900102 log.info("Stopped");
103 }
104
105 @Override
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900106 public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
Jian Li69600e02018-12-24 13:21:18 +0900107 if (producers == null || producers.isEmpty()) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900108 log.debug("InfluxDB telemetry service has not been enabled!");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900109 return;
Jian Lid1ce10a2018-06-12 13:47:23 +0900110 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900111
112 if (record.flowInfos().size() == 0) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900113 log.debug("No record to publish");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900114 return;
115 }
116
117 log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
118
Jian Li69600e02018-12-24 13:21:18 +0900119 producers.forEach((k, v) -> {
120 TelemetryConfig config = telemetryConfigService.getConfig(k);
121 InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900122
Jian Li69600e02018-12-24 13:21:18 +0900123 String database = influxDbConfig.database();
124 String measurement = influxDbConfig.measurement();
Jian Lif8b8c7f2018-08-27 18:49:04 +0900125
Jian Li69600e02018-12-24 13:21:18 +0900126 BatchPoints batchPoints = BatchPoints.database(database).build();
127
128 for (FlowInfo flowInfo: record.flowInfos()) {
129 Point.Builder pointBuilder = Point
130 .measurement((measurement == null) ? record.measurement() : measurement)
131 .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
132 .tag(DEVICE_ID, flowInfo.deviceId().toString())
133 .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
134 .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
135 .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
136 .tag(SRC_IP, flowInfo.srcIp().toString())
137 .tag(DST_IP, flowInfo.dstIp().toString())
138 .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
139 .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
140 .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
141 .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
142 .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
143 .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
144 .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
145 .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
146 .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
147 .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
148 .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
149
150 if (flowInfo.vlanId() != null) {
151 pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
152 }
153
154 if (flowInfo.srcPort() != null) {
155 pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
156 }
157
158 if (flowInfo.dstPort() != null) {
159 pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
160 }
161
162 batchPoints.point(pointBuilder.build());
Jian Lif8b8c7f2018-08-27 18:49:04 +0900163 }
Jian Li69600e02018-12-24 13:21:18 +0900164 v.write(batchPoints);
165 });
Jian Lid1ce10a2018-06-12 13:47:23 +0900166 }
167
168 @Override
169 public boolean isRunning() {
Jian Li69600e02018-12-24 13:21:18 +0900170 return !producers.isEmpty();
Jian Li6803ccd2018-06-08 09:26:09 +0900171 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900172
Jian Lia61e0b62018-12-28 19:10:10 +0900173 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900174 public boolean start(String name) {
175 boolean success = false;
Jian Lia61e0b62018-12-28 19:10:10 +0900176 TelemetryConfig config = telemetryConfigService.getConfig(name);
177 InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
178
Jian Li667c6eb2019-01-07 23:01:12 +0900179 if (influxDbConfig != null && !config.name().equals(INFLUXDB_SCHEME) &&
180 config.status() == ENABLED) {
Jian Lia61e0b62018-12-28 19:10:10 +0900181 StringBuilder influxDbServerBuilder = new StringBuilder();
182 influxDbServerBuilder.append(INFLUX_PROTOCOL);
183 influxDbServerBuilder.append(":");
184 influxDbServerBuilder.append("//");
185 influxDbServerBuilder.append(influxDbConfig.address());
186 influxDbServerBuilder.append(":");
187 influxDbServerBuilder.append(influxDbConfig.port());
188
Jian Li667c6eb2019-01-07 23:01:12 +0900189 if (testConnectivity(influxDbConfig.address(), influxDbConfig.port())) {
190 InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
191 influxDbConfig.username(), influxDbConfig.password());
192 producers.put(name, producer);
193 createDB(producer, influxDbConfig.database());
194 success = true;
195 } else {
196 log.warn("Unable to connect to {}:{}, " +
197 "please check the connectivity manually",
198 influxDbConfig.address(), influxDbConfig.port());
199 }
Jian Lia61e0b62018-12-28 19:10:10 +0900200 }
Jian Li667c6eb2019-01-07 23:01:12 +0900201
202 return success;
Jian Lia61e0b62018-12-28 19:10:10 +0900203 }
204
205 @Override
206 public void stop(String name) {
207 InfluxDB producer = producers.get(name);
208
209 if (producer != null) {
210 producer.close();
211 producers.remove(name);
212 }
213 }
214
215 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900216 public boolean restart(String name) {
Jian Lia61e0b62018-12-28 19:10:10 +0900217 stop(name);
Jian Li667c6eb2019-01-07 23:01:12 +0900218 return start(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900219 }
220
221 @Override
222 public void startAll() {
223 telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> start(c.name()));
Jian Li667c6eb2019-01-07 23:01:12 +0900224 log.info("InfluxDB producer has Started");
225 }
Jian Lia61e0b62018-12-28 19:10:10 +0900226
227 @Override
228 public void stopAll() {
229 if (producers != null) {
230 producers.values().forEach(InfluxDB::close);
231 }
232
233 log.info("InfluxDB producer has stopped");
234 }
235
236 @Override
237 public void restartAll() {
238 stopAll();
239 startAll();
240 }
Jian Li667c6eb2019-01-07 23:01:12 +0900241
242
243 private void createDB(InfluxDB producer, String database) {
244 if (producer.databaseExists(database)) {
245 log.debug("Database {} is already created", database);
246 } else {
247 producer.createDatabase(database);
248 log.debug("Database {} is created", database);
249 }
250 }
251
252 private String getTpPort(TpPort tpPort) {
253 if (tpPort == null) {
254 return "";
255 }
256 return tpPort.toString();
257 }
Jian Li6803ccd2018-06-08 09:26:09 +0900258}