blob: 41a318b07e137bca579cef8dd027bfb01f913622 [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Jian Lid1ce10a2018-06-12 13:47:23 +090021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090023import org.apache.felix.scr.annotations.Service;
24import org.influxdb.InfluxDB;
25import org.influxdb.InfluxDBFactory;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090026import org.influxdb.dto.BatchPoints;
27import org.influxdb.dto.Point;
28import org.onlab.packet.TpPort;
29import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090030import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
31import org.onosproject.openstacktelemetry.api.InfluxRecord;
Jian Lid1ce10a2018-06-12 13:47:23 +090032import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li6803ccd2018-06-08 09:26:09 +090033import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
34import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090038import java.util.Set;
39
Jian Li6803ccd2018-06-08 09:26:09 +090040/**
41 * InfluxDB telemetry manager.
42 */
43@Component(immediate = true)
44@Service
45public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
46
47 private final Logger log = LoggerFactory.getLogger(getClass());
48
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090049 private static final String FLOW_TYPE = "flowType";
50 private static final String DEVICE_ID = "deviceId";
51 private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
52 private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
53
54 private static final String VLAN_ID = "vlanId";
55 private static final String VXLAN_ID = "vxlanId";
56 private static final String SRC_IP = "srcIp";
57 private static final String DST_IP = "dstIp";
58 private static final String SRC_PORT = "srcPort";
59 private static final String DST_PORT = "dstPort";
60 private static final String PROTOCOL = "protocol";
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090061
62 private static final String STARTUP_TIME = "startupTime";
63 private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
64 private static final String LST_PKT_OFFSET = "lstPktOffset";
65 private static final String PREV_ACC_BYTES = "prevAccBytes";
66 private static final String PREV_ACC_PKTS = "prevAccPkts";
67 private static final String CURR_ACC_BYTES = "currAccBytes";
68 private static final String CURR_ACC_PKTS = "currAccPkts";
69 private static final String ERROR_PKTS = "errorPkts";
70 private static final String DROP_PKTS = "dropPkts";
71
Jian Lid1ce10a2018-06-12 13:47:23 +090072 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected OpenstackTelemetryService openstackTelemetryService;
74
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090075 private static final String INFLUX_PROTOCOL = "http";
Jian Li6803ccd2018-06-08 09:26:09 +090076 private InfluxDB producer = null;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090077 private String database = null;
78 private String measurement = null;
Jian Li6803ccd2018-06-08 09:26:09 +090079
80 @Activate
81 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090082
83 openstackTelemetryService.addTelemetryService(this);
84
Jian Li6803ccd2018-06-08 09:26:09 +090085 log.info("Started");
86 }
87
88 @Deactivate
89 protected void deactivate() {
90 stop();
Jian Lid1ce10a2018-06-12 13:47:23 +090091
92 openstackTelemetryService.removeTelemetryService(this);
93
Jian Li6803ccd2018-06-08 09:26:09 +090094 log.info("Stopped");
95 }
96
97 @Override
98 public void start(TelemetryConfig config) {
99 if (producer != null) {
100 log.info("InfluxDB producer has already been started");
101 return;
102 }
103
104 InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
105
106 StringBuilder influxDbServerBuilder = new StringBuilder();
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900107 influxDbServerBuilder.append(INFLUX_PROTOCOL);
Jian Li6803ccd2018-06-08 09:26:09 +0900108 influxDbServerBuilder.append(":");
109 influxDbServerBuilder.append("//");
110 influxDbServerBuilder.append(influxDbConfig.address());
111 influxDbServerBuilder.append(":");
112 influxDbServerBuilder.append(influxDbConfig.port());
113
114 producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
115 influxDbConfig.username(), influxDbConfig.password());
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900116 database = influxDbConfig.database();
117 measurement = influxDbConfig.measurement();
118
Jian Li6803ccd2018-06-08 09:26:09 +0900119 log.info("InfluxDB producer has Started");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900120
121 createDB();
Jian Li6803ccd2018-06-08 09:26:09 +0900122 }
123
124 @Override
125 public void stop() {
126 if (producer != null) {
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900127 producer.close();
Jian Li6803ccd2018-06-08 09:26:09 +0900128 producer = null;
129 }
130
boyoung21c5f5f42018-09-27 20:29:41 +0900131 log.info("InfluxDB producer has stopped");
Jian Li6803ccd2018-06-08 09:26:09 +0900132 }
133
134 @Override
135 public void restart(TelemetryConfig config) {
136 stop();
137 start(config);
138 }
139
140 @Override
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900141 public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900142 if (producer == null) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900143 log.debug("InfluxDB telemetry service has not been enabled!");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900144 return;
Jian Lid1ce10a2018-06-12 13:47:23 +0900145 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900146
147 if (record.flowInfos().size() == 0) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900148 log.debug("No record to publish");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900149 return;
150 }
151
152 log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
153
154 BatchPoints batchPoints = BatchPoints.database(database).build();
155
156 for (FlowInfo flowInfo: record.flowInfos()) {
Jian Lif8b8c7f2018-08-27 18:49:04 +0900157 Point.Builder pointBuilder = Point
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900158 .measurement((measurement == null) ? record.measurement() : measurement)
159 .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
160 .tag(DEVICE_ID, flowInfo.deviceId().toString())
161 .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
162 .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900163 .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
164 .tag(SRC_IP, flowInfo.srcIp().toString())
165 .tag(DST_IP, flowInfo.dstIp().toString())
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900166 .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
167 .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900168 .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
169 .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
170 .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
171 .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
172 .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
173 .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
174 .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
175 .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
Jian Lif8b8c7f2018-08-27 18:49:04 +0900176 .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
177
178 if (flowInfo.vlanId() != null) {
179 pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
180 }
181
182 if (flowInfo.srcPort() != null) {
183 pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
184 }
185
186 if (flowInfo.dstPort() != null) {
187 pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
188 }
189
190 batchPoints.point(pointBuilder.build());
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900191 }
192 producer.write(batchPoints);
Jian Lid1ce10a2018-06-12 13:47:23 +0900193 }
194
195 @Override
196 public boolean isRunning() {
197 return producer != null;
Jian Li6803ccd2018-06-08 09:26:09 +0900198 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900199
200 private void createDB() {
201 if (producer.databaseExists(database)) {
202 log.debug("Database {} is already created", database);
203 } else {
204 producer.createDatabase(database);
205 log.debug("Database {} is created", database);
206 }
207 }
208
209 private String getTpPort(TpPort tpPort) {
210 if (tpPort == null) {
211 return "";
212 }
213 return tpPort.toString();
214 }
Jian Li6803ccd2018-06-08 09:26:09 +0900215}