blob: 1871b5af2b4638646784cbd3bb7983e3341e6981 [file] [log] [blame]
Jian Li65f5aa22016-03-22 11:49:42 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jian Li65f5aa22016-03-22 11:49:42 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.influxdbmetrics;
17
18import com.google.common.collect.BiMap;
19import com.google.common.collect.EnumHashBiMap;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang.StringUtils;
Jian Li65f5aa22016-03-22 11:49:42 -070024import org.influxdb.InfluxDB;
25import org.influxdb.InfluxDBFactory;
26import org.influxdb.dto.Query;
27import org.influxdb.dto.QueryResult;
Jian Li65f5aa22016-03-22 11:49:42 -070028import org.onosproject.cluster.NodeId;
29import org.onosproject.core.CoreService;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070030import org.osgi.service.component.annotations.Activate;
31import org.osgi.service.component.annotations.Component;
32import org.osgi.service.component.annotations.Deactivate;
33import org.osgi.service.component.annotations.Reference;
34import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li65f5aa22016-03-22 11:49:42 -070035import org.slf4j.Logger;
36
Jian Li65f5aa22016-03-22 11:49:42 -070037import java.util.List;
38import java.util.Map;
39import java.util.Set;
40import java.util.concurrent.TimeUnit;
41
42import static org.slf4j.LoggerFactory.getLogger;
43
44/**
45 * A Metric retriever implementation for querying metrics from influxDB server.
46 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070047@Component(immediate = true, service = InfluxDbMetricsRetriever.class)
Jian Li65f5aa22016-03-22 11:49:42 -070048public class DefaultInfluxDbMetricsRetriever implements InfluxDbMetricsRetriever {
49
50 private final Logger log = getLogger(getClass());
51
52 private static final String DEFAULT_PROTOCOL = "http";
Jian Li65f5aa22016-03-22 11:49:42 -070053 private static final String DEFAULT_POLICY = "default";
54 private static final String COLON_SEPARATOR = ":";
55 private static final String SLASH_SEPARATOR = "//";
56 private static final String BRACKET_START = "[";
57 private static final String BRACKET_END = "]";
58 private static final String METRIC_DELIMITER = ".";
59 private static final int NUB_OF_DELIMITER = 3;
60
61 private static final BiMap<TimeUnit, String> TIME_UNIT_MAP =
62 EnumHashBiMap.create(TimeUnit.class);
63
64 static {
65 // key is TimeUnit enumeration type
66 // value is influx database time unit keyword
67 TIME_UNIT_MAP.put(TimeUnit.DAYS, "d");
68 TIME_UNIT_MAP.put(TimeUnit.HOURS, "h");
69 TIME_UNIT_MAP.put(TimeUnit.MINUTES, "m");
70 TIME_UNIT_MAP.put(TimeUnit.SECONDS, "s");
71 }
72
Ray Milkeyd84f89b2018-08-17 14:54:17 -070073 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li65f5aa22016-03-22 11:49:42 -070074 protected CoreService coreService;
75
Jian Libdfd37f2016-03-24 16:36:35 -070076 protected String database;
Jian Li65f5aa22016-03-22 11:49:42 -070077
78 InfluxDB influxDB;
79
80 @Activate
81 public void activate() {
Jian Li65f5aa22016-03-22 11:49:42 -070082 coreService.registerApplication("org.onosproject.influxdbmetrics");
83
Jian Li65f5aa22016-03-22 11:49:42 -070084 log.info("Started");
85 }
86
87 @Deactivate
88 public void deactivate() {
Jian Li65f5aa22016-03-22 11:49:42 -070089 log.info("Stopped");
90 }
91
Jian Libdfd37f2016-03-24 16:36:35 -070092 @Override
93 public void config(String address, int port, String database, String username, String password) {
Jian Li65f5aa22016-03-22 11:49:42 -070094 StringBuilder url = new StringBuilder();
95 url.append(DEFAULT_PROTOCOL);
96 url.append(COLON_SEPARATOR + SLASH_SEPARATOR);
97 url.append(address);
98 url.append(COLON_SEPARATOR);
99 url.append(port);
100
Jian Libdfd37f2016-03-24 16:36:35 -0700101 this.influxDB = InfluxDBFactory.connect(url.toString(), username, password);
102 this.database = database;
Jian Li65f5aa22016-03-22 11:49:42 -0700103 }
104
105 @Override
106 public Map<NodeId, Map<String, List<InfluxMetric>>> allMetrics(int period,
107 TimeUnit unit) {
108 Map<NodeId, Set<String>> nameMap = allMetricNames();
109 Map<NodeId, Map<String, List<InfluxMetric>>> metricsMap = Maps.newHashMap();
110
111 nameMap.forEach((nodeId, metricNames) ->
112 metricsMap.putIfAbsent(nodeId, metricsByNodeId(nodeId, period, unit))
113 );
114
115 return metricsMap;
116 }
117
118 @Override
119 public Map<String, List<InfluxMetric>> metricsByNodeId(NodeId nodeId, int period,
120 TimeUnit unit) {
121 Map<NodeId, Set<String>> nameMap = allMetricNames();
122 Map<String, List<InfluxMetric>> map = Maps.newHashMap();
123
124 nameMap.get(nodeId).forEach(metricName -> {
125 List<InfluxMetric> value = metric(nodeId, metricName, period, unit);
126 if (value != null) {
127 map.putIfAbsent(metricName, value);
128 }
129 });
130
131 return map;
132 }
133
134 @Override
135 public Map<NodeId, List<InfluxMetric>> metricsByName(String metricName, int period,
136 TimeUnit unit) {
137 Map<NodeId, List<InfluxMetric>> map = Maps.newHashMap();
138 List<InfluxMetric> metrics = Lists.newArrayList();
139 String queryPrefix = new StringBuilder()
140 .append("SELECT m1_rate FROM")
141 .append(database)
142 .append(METRIC_DELIMITER)
143 .append(quote(DEFAULT_POLICY))
144 .append(METRIC_DELIMITER)
145 .toString();
146 String querySuffix = new StringBuilder()
147 .append(" WHERE time > now() - ")
148 .append(period)
149 .append(unitString(unit))
150 .toString();
151
152 allMetricNames().keySet().forEach(nodeId -> {
153 String queryString = new StringBuilder()
154 .append(queryPrefix)
155 .append(quote(nodeId + METRIC_DELIMITER + metricName))
156 .append(querySuffix)
157 .toString();
158 Query query = new Query(queryString, database);
159 List<QueryResult.Result> results = influxDB.query(query).getResults();
160
161 if (results != null && results.get(0) != null
162 && results.get(0).getSeries() != null) {
163
164 results.get(0).getSeries().get(0).getValues().forEach(value ->
165 metrics.add(new DefaultInfluxMetric.Builder()
166 .time((String) value.get(0))
167 .oneMinRate((Double) value.get(1))
168 .build()));
169 map.putIfAbsent(nodeId, metrics);
170 }
171 });
172
173 return map;
174 }
175
176 @Override
177 public List<InfluxMetric> metric(NodeId nodeId, String metricName,
178 int period, TimeUnit unit) {
179 List<InfluxMetric> metrics = Lists.newArrayList();
180 String queryString = new StringBuilder()
181 .append("SELECT m1_rate FROM ")
182 .append(database)
183 .append(METRIC_DELIMITER)
184 .append(quote(DEFAULT_POLICY))
185 .append(METRIC_DELIMITER)
186 .append(quote(nodeId + METRIC_DELIMITER + metricName))
187 .append(" WHERE time > now() - ")
188 .append(period)
189 .append(unitString(unit))
190 .toString();
191
192 Query query = new Query(queryString, database);
193 List<QueryResult.Result> results = influxDB.query(query).getResults();
194
195 if (results != null && results.get(0) != null
196 && results.get(0).getSeries() != null) {
197
198 results.get(0).getSeries().get(0).getValues().forEach(value ->
199 metrics.add(new DefaultInfluxMetric.Builder()
200 .time((String) value.get(0))
201 .oneMinRate((Double) value.get(1))
202 .build()));
203 return metrics;
204 }
205
206 return null;
207 }
208
209 @Override
210 public Map<NodeId, Map<String, InfluxMetric>> allMetrics() {
211 Map<NodeId, Set<String>> nameMap = allMetricNames();
212 Map<NodeId, Map<String, InfluxMetric>> metricsMap = Maps.newHashMap();
213
214 nameMap.forEach((nodeId, metricNames) ->
215 metricsMap.putIfAbsent(nodeId, metricsByNodeId(nodeId))
216 );
217
218 return metricsMap;
219 }
220
221 @Override
222 public Map<String, InfluxMetric> metricsByNodeId(NodeId nodeId) {
223 Map<NodeId, Set<String>> nameMap = allMetricNames();
224 Map<String, InfluxMetric> map = Maps.newHashMap();
225
226 nameMap.get(nodeId).forEach(metricName -> {
227 InfluxMetric value = metric(nodeId, metricName);
228 if (value != null) {
229 map.putIfAbsent(metricName, value);
230 }
231 });
232
233 return map;
234 }
235
236 @Override
237 public Map<NodeId, InfluxMetric> metricsByName(String metricName) {
238 Map<NodeId, InfluxMetric> map = Maps.newHashMap();
239 String queryPrefix = new StringBuilder()
240 .append("SELECT m1_rate FROM")
241 .append(database)
242 .append(METRIC_DELIMITER)
243 .append(quote(DEFAULT_POLICY))
244 .append(METRIC_DELIMITER)
245 .toString();
246 String querySuffix = new StringBuilder()
247 .append(" LIMIT 1")
248 .toString();
249
250 allMetricNames().keySet().forEach(nodeId -> {
251 String queryString = new StringBuilder()
252 .append(queryPrefix)
253 .append(quote(nodeId + METRIC_DELIMITER + metricName))
254 .append(querySuffix)
255 .toString();
256 Query query = new Query(queryString, database);
257 List<QueryResult.Result> results = influxDB.query(query).getResults();
258
259 if (results != null && results.get(0) != null
260 && results.get(0).getSeries() != null) {
261 InfluxMetric metric = new DefaultInfluxMetric.Builder()
262 .time((String) results.get(0).getSeries().get(0).getValues().get(0).get(0))
263 .oneMinRate((Double) results.get(0).getSeries().get(0)
264 .getValues().get(0).get(1)).build();
265 map.putIfAbsent(nodeId, metric);
266 }
267 });
268
269 return map;
270 }
271
272 @Override
273 public InfluxMetric metric(NodeId nodeId, String metricName) {
274 String queryString = new StringBuilder()
275 .append("SELECT m1_rate FROM ")
276 .append(database)
277 .append(METRIC_DELIMITER)
278 .append(quote(DEFAULT_POLICY))
279 .append(METRIC_DELIMITER)
280 .append(quote(nodeId + METRIC_DELIMITER + metricName))
281 .append(" LIMIT 1")
282 .toString();
283
284 Query query = new Query(queryString, database);
285 List<QueryResult.Result> results = influxDB.query(query).getResults();
286
287 if (results != null && results.get(0) != null
288 && results.get(0).getSeries() != null) {
289 return new DefaultInfluxMetric.Builder()
290 .time((String) results.get(0).getSeries().get(0).getValues().get(0).get(0))
291 .oneMinRate((Double) results.get(0).getSeries().get(0)
292 .getValues().get(0).get(1)).build();
293 }
294
295 return null;
296 }
297
298 private String unitString(TimeUnit unit) {
299 return TIME_UNIT_MAP.get(unit) == null ? "h" : TIME_UNIT_MAP.get(unit);
300 }
301
302 private String quote(String str) {
303 return "\"" + str + "\"";
304 }
305
306 /**
307 * Returns all metric names that bound with node identification.
308 *
309 * @return all metric names
310 */
311 protected Map<NodeId, Set<String>> allMetricNames() {
312 Map<NodeId, Set<String>> metricNameMap = Maps.newHashMap();
313 Query query = new Query("SHOW MEASUREMENTS", database);
314 List<QueryResult.Result> results = influxDB.query(query).getResults();
315 List<List<Object>> rawMetricNames = results.get(0).getSeries().get(0).getValues();
316
317 rawMetricNames.forEach(rawMetricName -> {
318 String nodeIdStr = getNodeId(strip(rawMetricName.toString()));
319
320 if (nodeIdStr != null) {
321 NodeId nodeId = NodeId.nodeId(nodeIdStr);
322 String metricName = getMetricName(strip(rawMetricName.toString()));
323
324 if (!metricNameMap.containsKey(nodeId)) {
325 metricNameMap.putIfAbsent(nodeId, Sets.newHashSet());
326 }
327
328 if (metricName != null) {
329 metricNameMap.get(nodeId).add(metricName);
330 }
331 }
332 });
333
334 return metricNameMap;
335 }
336
337 /**
338 * Strips special bracket from the full name.
339 *
340 * @param fullName full name
341 * @return bracket stripped string
342 */
343 private String strip(String fullName) {
344 return StringUtils.strip(StringUtils.strip(fullName, BRACKET_START), BRACKET_END);
345 }
346
347 /**
348 * Returns metric name from full name.
349 * The elements in full name is split by using '.';
350 * We assume that the metric name always comes after the last three '.'
351 *
352 * @param fullName full name
353 * @return metric name
354 */
355 private String getMetricName(String fullName) {
356 int index = StringUtils.lastOrdinalIndexOf(fullName,
357 METRIC_DELIMITER, NUB_OF_DELIMITER);
358 if (index != -1) {
359 return StringUtils.substring(fullName, index + 1);
360 } else {
361 log.warn("Database {} contains malformed metric name.", database);
362 return null;
363 }
364 }
365
366 /**
367 * Returns node id from full name.
368 * The elements in full name is split by using '.';
369 * We assume that the node id always comes before the last three '.'
370 *
371 * @param fullName full name
372 * @return node id
373 */
374 private String getNodeId(String fullName) {
375 int index = StringUtils.lastOrdinalIndexOf(fullName,
376 METRIC_DELIMITER, NUB_OF_DELIMITER);
377 if (index != -1) {
378 return StringUtils.substring(fullName, 0, index);
379 } else {
380 log.warn("Database {} contains malformed node id.", database);
381 return null;
382 }
383 }
Jian Li65f5aa22016-03-22 11:49:42 -0700384}