blob: 796da19c3b853e55f71c0accc62dbd0d9bd59d4c [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li6803ccd2018-06-08 09:26:09 +090018import org.influxdb.InfluxDB;
19import org.influxdb.InfluxDBFactory;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090020import org.influxdb.dto.BatchPoints;
21import org.influxdb.dto.Point;
22import org.onlab.packet.TpPort;
23import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090024import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
25import org.onosproject.openstacktelemetry.api.InfluxRecord;
Jian Lid1ce10a2018-06-12 13:47:23 +090026import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li69600e02018-12-24 13:21:18 +090027import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
Jian Li6803ccd2018-06-08 09:26:09 +090028import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
29import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070030import org.osgi.service.component.annotations.Activate;
31import org.osgi.service.component.annotations.Component;
32import org.osgi.service.component.annotations.Deactivate;
33import org.osgi.service.component.annotations.Reference;
34import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090035import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Jian Li69600e02018-12-24 13:21:18 +090038import java.util.Map;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090039import java.util.Set;
40
Jian Li69600e02018-12-24 13:21:18 +090041import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
42import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
43import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
44
Jian Li6803ccd2018-06-08 09:26:09 +090045/**
46 * InfluxDB telemetry manager.
47 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070048@Component(immediate = true, service = InfluxDbTelemetryAdminService.class)
Jian Li6803ccd2018-06-08 09:26:09 +090049public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
50
51 private final Logger log = LoggerFactory.getLogger(getClass());
52
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090053 private static final String FLOW_TYPE = "flowType";
54 private static final String DEVICE_ID = "deviceId";
55 private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
56 private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
57
58 private static final String VLAN_ID = "vlanId";
59 private static final String VXLAN_ID = "vxlanId";
60 private static final String SRC_IP = "srcIp";
61 private static final String DST_IP = "dstIp";
62 private static final String SRC_PORT = "srcPort";
63 private static final String DST_PORT = "dstPort";
64 private static final String PROTOCOL = "protocol";
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090065
66 private static final String STARTUP_TIME = "startupTime";
67 private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
68 private static final String LST_PKT_OFFSET = "lstPktOffset";
69 private static final String PREV_ACC_BYTES = "prevAccBytes";
70 private static final String PREV_ACC_PKTS = "prevAccPkts";
71 private static final String CURR_ACC_BYTES = "currAccBytes";
72 private static final String CURR_ACC_PKTS = "currAccPkts";
73 private static final String ERROR_PKTS = "errorPkts";
74 private static final String DROP_PKTS = "dropPkts";
75
Ray Milkeyd84f89b2018-08-17 14:54:17 -070076 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090077 protected OpenstackTelemetryService openstackTelemetryService;
78
Jian Li69600e02018-12-24 13:21:18 +090079 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected TelemetryConfigService telemetryConfigService;
81
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090082 private static final String INFLUX_PROTOCOL = "http";
Jian Li69600e02018-12-24 13:21:18 +090083 private Map<String, InfluxDB> producers = null;
Jian Li6803ccd2018-06-08 09:26:09 +090084
85 @Activate
86 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090087
88 openstackTelemetryService.addTelemetryService(this);
89
Jian Li6803ccd2018-06-08 09:26:09 +090090 log.info("Started");
91 }
92
93 @Deactivate
94 protected void deactivate() {
95 stop();
Jian Lid1ce10a2018-06-12 13:47:23 +090096
97 openstackTelemetryService.removeTelemetryService(this);
98
Jian Li6803ccd2018-06-08 09:26:09 +090099 log.info("Stopped");
100 }
101
102 @Override
Jian Li69600e02018-12-24 13:21:18 +0900103 public void start() {
Jian Li6803ccd2018-06-08 09:26:09 +0900104
Jian Li69600e02018-12-24 13:21:18 +0900105 telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
106 InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
Jian Li6803ccd2018-06-08 09:26:09 +0900107
Jian Li69600e02018-12-24 13:21:18 +0900108 if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
109 StringBuilder influxDbServerBuilder = new StringBuilder();
110 influxDbServerBuilder.append(INFLUX_PROTOCOL);
111 influxDbServerBuilder.append(":");
112 influxDbServerBuilder.append("//");
113 influxDbServerBuilder.append(influxDbConfig.address());
114 influxDbServerBuilder.append(":");
115 influxDbServerBuilder.append(influxDbConfig.port());
Jian Li6803ccd2018-06-08 09:26:09 +0900116
Jian Li69600e02018-12-24 13:21:18 +0900117 InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
118 influxDbConfig.username(), influxDbConfig.password());
119 producers.put(c.name(), producer);
120
121 createDB(producer, influxDbConfig.database());
122 }
123 });
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900124
Jian Li6803ccd2018-06-08 09:26:09 +0900125 log.info("InfluxDB producer has Started");
126 }
127
128 @Override
129 public void stop() {
Jian Li69600e02018-12-24 13:21:18 +0900130 if (producers != null) {
131 producers.values().forEach(InfluxDB::close);
Jian Li6803ccd2018-06-08 09:26:09 +0900132 }
133
boyoung21c5f5f42018-09-27 20:29:41 +0900134 log.info("InfluxDB producer has stopped");
Jian Li6803ccd2018-06-08 09:26:09 +0900135 }
136
137 @Override
Jian Li69600e02018-12-24 13:21:18 +0900138 public void restart() {
Jian Li6803ccd2018-06-08 09:26:09 +0900139 stop();
Jian Li69600e02018-12-24 13:21:18 +0900140 start();
Jian Li6803ccd2018-06-08 09:26:09 +0900141 }
142
143 @Override
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900144 public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
Jian Li69600e02018-12-24 13:21:18 +0900145 if (producers == null || producers.isEmpty()) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900146 log.debug("InfluxDB telemetry service has not been enabled!");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900147 return;
Jian Lid1ce10a2018-06-12 13:47:23 +0900148 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900149
150 if (record.flowInfos().size() == 0) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900151 log.debug("No record to publish");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900152 return;
153 }
154
155 log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
156
Jian Li69600e02018-12-24 13:21:18 +0900157 producers.forEach((k, v) -> {
158 TelemetryConfig config = telemetryConfigService.getConfig(k);
159 InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900160
Jian Li69600e02018-12-24 13:21:18 +0900161 String database = influxDbConfig.database();
162 String measurement = influxDbConfig.measurement();
Jian Lif8b8c7f2018-08-27 18:49:04 +0900163
Jian Li69600e02018-12-24 13:21:18 +0900164 BatchPoints batchPoints = BatchPoints.database(database).build();
165
166 for (FlowInfo flowInfo: record.flowInfos()) {
167 Point.Builder pointBuilder = Point
168 .measurement((measurement == null) ? record.measurement() : measurement)
169 .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
170 .tag(DEVICE_ID, flowInfo.deviceId().toString())
171 .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
172 .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
173 .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
174 .tag(SRC_IP, flowInfo.srcIp().toString())
175 .tag(DST_IP, flowInfo.dstIp().toString())
176 .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
177 .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
178 .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
179 .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
180 .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
181 .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
182 .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
183 .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
184 .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
185 .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
186 .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
187
188 if (flowInfo.vlanId() != null) {
189 pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
190 }
191
192 if (flowInfo.srcPort() != null) {
193 pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
194 }
195
196 if (flowInfo.dstPort() != null) {
197 pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
198 }
199
200 batchPoints.point(pointBuilder.build());
Jian Lif8b8c7f2018-08-27 18:49:04 +0900201 }
Jian Li69600e02018-12-24 13:21:18 +0900202 v.write(batchPoints);
203 });
Jian Lid1ce10a2018-06-12 13:47:23 +0900204 }
205
206 @Override
207 public boolean isRunning() {
Jian Li69600e02018-12-24 13:21:18 +0900208 return !producers.isEmpty();
Jian Li6803ccd2018-06-08 09:26:09 +0900209 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900210
Jian Li69600e02018-12-24 13:21:18 +0900211 private void createDB(InfluxDB producer, String database) {
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900212 if (producer.databaseExists(database)) {
213 log.debug("Database {} is already created", database);
214 } else {
215 producer.createDatabase(database);
216 log.debug("Database {} is created", database);
217 }
218 }
219
220 private String getTpPort(TpPort tpPort) {
221 if (tpPort == null) {
222 return "";
223 }
224 return tpPort.toString();
225 }
Jian Li6803ccd2018-06-08 09:26:09 +0900226}