blob: e35d662da5f691c5ec0f3fd9ca776215632e6252 [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Jian Lid1ce10a2018-06-12 13:47:23 +090021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090023import org.apache.felix.scr.annotations.Service;
24import org.influxdb.InfluxDB;
25import org.influxdb.InfluxDBFactory;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090026import org.influxdb.dto.BatchPoints;
27import org.influxdb.dto.Point;
28import org.onlab.packet.TpPort;
29import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090030import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
31import org.onosproject.openstacktelemetry.api.InfluxRecord;
Jian Lid1ce10a2018-06-12 13:47:23 +090032import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li6803ccd2018-06-08 09:26:09 +090033import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
34import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090038import java.util.Set;
39
Jian Li6803ccd2018-06-08 09:26:09 +090040/**
41 * InfluxDB telemetry manager.
42 */
43@Component(immediate = true)
44@Service
45public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
46
47 private final Logger log = LoggerFactory.getLogger(getClass());
48
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090049 private static final String FLOW_TYPE = "flowType";
50 private static final String DEVICE_ID = "deviceId";
51 private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
52 private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
53
54 private static final String VLAN_ID = "vlanId";
55 private static final String VXLAN_ID = "vxlanId";
56 private static final String SRC_IP = "srcIp";
57 private static final String DST_IP = "dstIp";
58 private static final String SRC_PORT = "srcPort";
59 private static final String DST_PORT = "dstPort";
60 private static final String PROTOCOL = "protocol";
61 private static final String SRC_MAC = "srcMac";
62 private static final String DST_MAC = "dstMac";
63
64 private static final String STARTUP_TIME = "startupTime";
65 private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
66 private static final String LST_PKT_OFFSET = "lstPktOffset";
67 private static final String PREV_ACC_BYTES = "prevAccBytes";
68 private static final String PREV_ACC_PKTS = "prevAccPkts";
69 private static final String CURR_ACC_BYTES = "currAccBytes";
70 private static final String CURR_ACC_PKTS = "currAccPkts";
71 private static final String ERROR_PKTS = "errorPkts";
72 private static final String DROP_PKTS = "dropPkts";
73
Jian Lid1ce10a2018-06-12 13:47:23 +090074 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected OpenstackTelemetryService openstackTelemetryService;
76
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090077 private static final String INFLUX_PROTOCOL = "http";
Jian Li6803ccd2018-06-08 09:26:09 +090078 private InfluxDB producer = null;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090079 private String database = null;
80 private String measurement = null;
Jian Li6803ccd2018-06-08 09:26:09 +090081
82 @Activate
83 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090084
85 openstackTelemetryService.addTelemetryService(this);
86
Jian Li6803ccd2018-06-08 09:26:09 +090087 log.info("Started");
88 }
89
90 @Deactivate
91 protected void deactivate() {
92 stop();
Jian Lid1ce10a2018-06-12 13:47:23 +090093
94 openstackTelemetryService.removeTelemetryService(this);
95
Jian Li6803ccd2018-06-08 09:26:09 +090096 log.info("Stopped");
97 }
98
99 @Override
100 public void start(TelemetryConfig config) {
101 if (producer != null) {
102 log.info("InfluxDB producer has already been started");
103 return;
104 }
105
106 InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
107
108 StringBuilder influxDbServerBuilder = new StringBuilder();
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900109 influxDbServerBuilder.append(INFLUX_PROTOCOL);
Jian Li6803ccd2018-06-08 09:26:09 +0900110 influxDbServerBuilder.append(":");
111 influxDbServerBuilder.append("//");
112 influxDbServerBuilder.append(influxDbConfig.address());
113 influxDbServerBuilder.append(":");
114 influxDbServerBuilder.append(influxDbConfig.port());
115
116 producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
117 influxDbConfig.username(), influxDbConfig.password());
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900118 database = influxDbConfig.database();
119 measurement = influxDbConfig.measurement();
120
Jian Li6803ccd2018-06-08 09:26:09 +0900121 log.info("InfluxDB producer has Started");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900122
123 createDB();
Jian Li6803ccd2018-06-08 09:26:09 +0900124 }
125
126 @Override
127 public void stop() {
128 if (producer != null) {
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900129 producer.close();
Jian Li6803ccd2018-06-08 09:26:09 +0900130 producer = null;
131 }
132
Boyoung Jeong9e8faec2018-06-17 21:19:23 +0900133 log.info("InfluxDB producer has Stopped");
Jian Li6803ccd2018-06-08 09:26:09 +0900134 }
135
136 @Override
137 public void restart(TelemetryConfig config) {
138 stop();
139 start(config);
140 }
141
142 @Override
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900143 public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900144 if (producer == null) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900145 log.debug("InfluxDB telemetry service has not been enabled!");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900146 return;
Jian Lid1ce10a2018-06-12 13:47:23 +0900147 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900148
149 if (record.flowInfos().size() == 0) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900150 log.debug("No record to publish");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900151 return;
152 }
153
154 log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
155
156 BatchPoints batchPoints = BatchPoints.database(database).build();
157
158 for (FlowInfo flowInfo: record.flowInfos()) {
Jian Lif8b8c7f2018-08-27 18:49:04 +0900159 Point.Builder pointBuilder = Point
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900160 .measurement((measurement == null) ? record.measurement() : measurement)
161 .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
162 .tag(DEVICE_ID, flowInfo.deviceId().toString())
163 .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
164 .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900165 .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
166 .tag(SRC_IP, flowInfo.srcIp().toString())
167 .tag(DST_IP, flowInfo.dstIp().toString())
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900168 .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
169 .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900170 .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
171 .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
172 .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
173 .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
174 .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
175 .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
176 .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
177 .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
Jian Lif8b8c7f2018-08-27 18:49:04 +0900178 .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
179
180 if (flowInfo.vlanId() != null) {
181 pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
182 }
183
184 if (flowInfo.srcPort() != null) {
185 pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
186 }
187
188 if (flowInfo.dstPort() != null) {
189 pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
190 }
191
192 batchPoints.point(pointBuilder.build());
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900193 }
194 producer.write(batchPoints);
Jian Lid1ce10a2018-06-12 13:47:23 +0900195 }
196
197 @Override
198 public boolean isRunning() {
199 return producer != null;
Jian Li6803ccd2018-06-08 09:26:09 +0900200 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900201
202 private void createDB() {
203 if (producer.databaseExists(database)) {
204 log.debug("Database {} is already created", database);
205 } else {
206 producer.createDatabase(database);
207 log.debug("Database {} is created", database);
208 }
209 }
210
211 private String getTpPort(TpPort tpPort) {
212 if (tpPort == null) {
213 return "";
214 }
215 return tpPort.toString();
216 }
Jian Li6803ccd2018-06-08 09:26:09 +0900217}