blob: a174925b8b5ad6b13e3103ac9adb8476abf0526f [file] [log] [blame]
Jian Li65f5aa22016-03-22 11:49:42 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jian Li65f5aa22016-03-22 11:49:42 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.influxdbmetrics;
17
18import com.google.common.collect.BiMap;
19import com.google.common.collect.EnumHashBiMap;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang.StringUtils;
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
Jian Li65f5aa22016-03-22 11:49:42 -070027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
30import org.influxdb.InfluxDB;
31import org.influxdb.InfluxDBFactory;
32import org.influxdb.dto.Query;
33import org.influxdb.dto.QueryResult;
Jian Li65f5aa22016-03-22 11:49:42 -070034import org.onosproject.cluster.NodeId;
35import org.onosproject.core.CoreService;
Jian Li65f5aa22016-03-22 11:49:42 -070036import org.slf4j.Logger;
37
Jian Li65f5aa22016-03-22 11:49:42 -070038import java.util.List;
39import java.util.Map;
40import java.util.Set;
41import java.util.concurrent.TimeUnit;
42
43import static org.slf4j.LoggerFactory.getLogger;
44
45/**
46 * A Metric retriever implementation for querying metrics from influxDB server.
47 */
48@Component(immediate = true)
49@Service
50public class DefaultInfluxDbMetricsRetriever implements InfluxDbMetricsRetriever {
51
52 private final Logger log = getLogger(getClass());
53
54 private static final String DEFAULT_PROTOCOL = "http";
Jian Li65f5aa22016-03-22 11:49:42 -070055 private static final String DEFAULT_POLICY = "default";
56 private static final String COLON_SEPARATOR = ":";
57 private static final String SLASH_SEPARATOR = "//";
58 private static final String BRACKET_START = "[";
59 private static final String BRACKET_END = "]";
60 private static final String METRIC_DELIMITER = ".";
61 private static final int NUB_OF_DELIMITER = 3;
62
63 private static final BiMap<TimeUnit, String> TIME_UNIT_MAP =
64 EnumHashBiMap.create(TimeUnit.class);
65
66 static {
67 // key is TimeUnit enumeration type
68 // value is influx database time unit keyword
69 TIME_UNIT_MAP.put(TimeUnit.DAYS, "d");
70 TIME_UNIT_MAP.put(TimeUnit.HOURS, "h");
71 TIME_UNIT_MAP.put(TimeUnit.MINUTES, "m");
72 TIME_UNIT_MAP.put(TimeUnit.SECONDS, "s");
73 }
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected CoreService coreService;
77
Jian Libdfd37f2016-03-24 16:36:35 -070078 protected String database;
Jian Li65f5aa22016-03-22 11:49:42 -070079
80 InfluxDB influxDB;
81
82 @Activate
83 public void activate() {
Jian Li65f5aa22016-03-22 11:49:42 -070084 coreService.registerApplication("org.onosproject.influxdbmetrics");
85
Jian Li65f5aa22016-03-22 11:49:42 -070086 log.info("Started");
87 }
88
89 @Deactivate
90 public void deactivate() {
Jian Li65f5aa22016-03-22 11:49:42 -070091 log.info("Stopped");
92 }
93
Jian Libdfd37f2016-03-24 16:36:35 -070094 @Override
95 public void config(String address, int port, String database, String username, String password) {
Jian Li65f5aa22016-03-22 11:49:42 -070096 StringBuilder url = new StringBuilder();
97 url.append(DEFAULT_PROTOCOL);
98 url.append(COLON_SEPARATOR + SLASH_SEPARATOR);
99 url.append(address);
100 url.append(COLON_SEPARATOR);
101 url.append(port);
102
Jian Libdfd37f2016-03-24 16:36:35 -0700103 this.influxDB = InfluxDBFactory.connect(url.toString(), username, password);
104 this.database = database;
Jian Li65f5aa22016-03-22 11:49:42 -0700105 }
106
107 @Override
108 public Map<NodeId, Map<String, List<InfluxMetric>>> allMetrics(int period,
109 TimeUnit unit) {
110 Map<NodeId, Set<String>> nameMap = allMetricNames();
111 Map<NodeId, Map<String, List<InfluxMetric>>> metricsMap = Maps.newHashMap();
112
113 nameMap.forEach((nodeId, metricNames) ->
114 metricsMap.putIfAbsent(nodeId, metricsByNodeId(nodeId, period, unit))
115 );
116
117 return metricsMap;
118 }
119
120 @Override
121 public Map<String, List<InfluxMetric>> metricsByNodeId(NodeId nodeId, int period,
122 TimeUnit unit) {
123 Map<NodeId, Set<String>> nameMap = allMetricNames();
124 Map<String, List<InfluxMetric>> map = Maps.newHashMap();
125
126 nameMap.get(nodeId).forEach(metricName -> {
127 List<InfluxMetric> value = metric(nodeId, metricName, period, unit);
128 if (value != null) {
129 map.putIfAbsent(metricName, value);
130 }
131 });
132
133 return map;
134 }
135
136 @Override
137 public Map<NodeId, List<InfluxMetric>> metricsByName(String metricName, int period,
138 TimeUnit unit) {
139 Map<NodeId, List<InfluxMetric>> map = Maps.newHashMap();
140 List<InfluxMetric> metrics = Lists.newArrayList();
141 String queryPrefix = new StringBuilder()
142 .append("SELECT m1_rate FROM")
143 .append(database)
144 .append(METRIC_DELIMITER)
145 .append(quote(DEFAULT_POLICY))
146 .append(METRIC_DELIMITER)
147 .toString();
148 String querySuffix = new StringBuilder()
149 .append(" WHERE time > now() - ")
150 .append(period)
151 .append(unitString(unit))
152 .toString();
153
154 allMetricNames().keySet().forEach(nodeId -> {
155 String queryString = new StringBuilder()
156 .append(queryPrefix)
157 .append(quote(nodeId + METRIC_DELIMITER + metricName))
158 .append(querySuffix)
159 .toString();
160 Query query = new Query(queryString, database);
161 List<QueryResult.Result> results = influxDB.query(query).getResults();
162
163 if (results != null && results.get(0) != null
164 && results.get(0).getSeries() != null) {
165
166 results.get(0).getSeries().get(0).getValues().forEach(value ->
167 metrics.add(new DefaultInfluxMetric.Builder()
168 .time((String) value.get(0))
169 .oneMinRate((Double) value.get(1))
170 .build()));
171 map.putIfAbsent(nodeId, metrics);
172 }
173 });
174
175 return map;
176 }
177
178 @Override
179 public List<InfluxMetric> metric(NodeId nodeId, String metricName,
180 int period, TimeUnit unit) {
181 List<InfluxMetric> metrics = Lists.newArrayList();
182 String queryString = new StringBuilder()
183 .append("SELECT m1_rate FROM ")
184 .append(database)
185 .append(METRIC_DELIMITER)
186 .append(quote(DEFAULT_POLICY))
187 .append(METRIC_DELIMITER)
188 .append(quote(nodeId + METRIC_DELIMITER + metricName))
189 .append(" WHERE time > now() - ")
190 .append(period)
191 .append(unitString(unit))
192 .toString();
193
194 Query query = new Query(queryString, database);
195 List<QueryResult.Result> results = influxDB.query(query).getResults();
196
197 if (results != null && results.get(0) != null
198 && results.get(0).getSeries() != null) {
199
200 results.get(0).getSeries().get(0).getValues().forEach(value ->
201 metrics.add(new DefaultInfluxMetric.Builder()
202 .time((String) value.get(0))
203 .oneMinRate((Double) value.get(1))
204 .build()));
205 return metrics;
206 }
207
208 return null;
209 }
210
211 @Override
212 public Map<NodeId, Map<String, InfluxMetric>> allMetrics() {
213 Map<NodeId, Set<String>> nameMap = allMetricNames();
214 Map<NodeId, Map<String, InfluxMetric>> metricsMap = Maps.newHashMap();
215
216 nameMap.forEach((nodeId, metricNames) ->
217 metricsMap.putIfAbsent(nodeId, metricsByNodeId(nodeId))
218 );
219
220 return metricsMap;
221 }
222
223 @Override
224 public Map<String, InfluxMetric> metricsByNodeId(NodeId nodeId) {
225 Map<NodeId, Set<String>> nameMap = allMetricNames();
226 Map<String, InfluxMetric> map = Maps.newHashMap();
227
228 nameMap.get(nodeId).forEach(metricName -> {
229 InfluxMetric value = metric(nodeId, metricName);
230 if (value != null) {
231 map.putIfAbsent(metricName, value);
232 }
233 });
234
235 return map;
236 }
237
238 @Override
239 public Map<NodeId, InfluxMetric> metricsByName(String metricName) {
240 Map<NodeId, InfluxMetric> map = Maps.newHashMap();
241 String queryPrefix = new StringBuilder()
242 .append("SELECT m1_rate FROM")
243 .append(database)
244 .append(METRIC_DELIMITER)
245 .append(quote(DEFAULT_POLICY))
246 .append(METRIC_DELIMITER)
247 .toString();
248 String querySuffix = new StringBuilder()
249 .append(" LIMIT 1")
250 .toString();
251
252 allMetricNames().keySet().forEach(nodeId -> {
253 String queryString = new StringBuilder()
254 .append(queryPrefix)
255 .append(quote(nodeId + METRIC_DELIMITER + metricName))
256 .append(querySuffix)
257 .toString();
258 Query query = new Query(queryString, database);
259 List<QueryResult.Result> results = influxDB.query(query).getResults();
260
261 if (results != null && results.get(0) != null
262 && results.get(0).getSeries() != null) {
263 InfluxMetric metric = new DefaultInfluxMetric.Builder()
264 .time((String) results.get(0).getSeries().get(0).getValues().get(0).get(0))
265 .oneMinRate((Double) results.get(0).getSeries().get(0)
266 .getValues().get(0).get(1)).build();
267 map.putIfAbsent(nodeId, metric);
268 }
269 });
270
271 return map;
272 }
273
274 @Override
275 public InfluxMetric metric(NodeId nodeId, String metricName) {
276 String queryString = new StringBuilder()
277 .append("SELECT m1_rate FROM ")
278 .append(database)
279 .append(METRIC_DELIMITER)
280 .append(quote(DEFAULT_POLICY))
281 .append(METRIC_DELIMITER)
282 .append(quote(nodeId + METRIC_DELIMITER + metricName))
283 .append(" LIMIT 1")
284 .toString();
285
286 Query query = new Query(queryString, database);
287 List<QueryResult.Result> results = influxDB.query(query).getResults();
288
289 if (results != null && results.get(0) != null
290 && results.get(0).getSeries() != null) {
291 return new DefaultInfluxMetric.Builder()
292 .time((String) results.get(0).getSeries().get(0).getValues().get(0).get(0))
293 .oneMinRate((Double) results.get(0).getSeries().get(0)
294 .getValues().get(0).get(1)).build();
295 }
296
297 return null;
298 }
299
300 private String unitString(TimeUnit unit) {
301 return TIME_UNIT_MAP.get(unit) == null ? "h" : TIME_UNIT_MAP.get(unit);
302 }
303
304 private String quote(String str) {
305 return "\"" + str + "\"";
306 }
307
308 /**
309 * Returns all metric names that bound with node identification.
310 *
311 * @return all metric names
312 */
313 protected Map<NodeId, Set<String>> allMetricNames() {
314 Map<NodeId, Set<String>> metricNameMap = Maps.newHashMap();
315 Query query = new Query("SHOW MEASUREMENTS", database);
316 List<QueryResult.Result> results = influxDB.query(query).getResults();
317 List<List<Object>> rawMetricNames = results.get(0).getSeries().get(0).getValues();
318
319 rawMetricNames.forEach(rawMetricName -> {
320 String nodeIdStr = getNodeId(strip(rawMetricName.toString()));
321
322 if (nodeIdStr != null) {
323 NodeId nodeId = NodeId.nodeId(nodeIdStr);
324 String metricName = getMetricName(strip(rawMetricName.toString()));
325
326 if (!metricNameMap.containsKey(nodeId)) {
327 metricNameMap.putIfAbsent(nodeId, Sets.newHashSet());
328 }
329
330 if (metricName != null) {
331 metricNameMap.get(nodeId).add(metricName);
332 }
333 }
334 });
335
336 return metricNameMap;
337 }
338
339 /**
340 * Strips special bracket from the full name.
341 *
342 * @param fullName full name
343 * @return bracket stripped string
344 */
345 private String strip(String fullName) {
346 return StringUtils.strip(StringUtils.strip(fullName, BRACKET_START), BRACKET_END);
347 }
348
349 /**
350 * Returns metric name from full name.
351 * The elements in full name is split by using '.';
352 * We assume that the metric name always comes after the last three '.'
353 *
354 * @param fullName full name
355 * @return metric name
356 */
357 private String getMetricName(String fullName) {
358 int index = StringUtils.lastOrdinalIndexOf(fullName,
359 METRIC_DELIMITER, NUB_OF_DELIMITER);
360 if (index != -1) {
361 return StringUtils.substring(fullName, index + 1);
362 } else {
363 log.warn("Database {} contains malformed metric name.", database);
364 return null;
365 }
366 }
367
368 /**
369 * Returns node id from full name.
370 * The elements in full name is split by using '.';
371 * We assume that the node id always comes before the last three '.'
372 *
373 * @param fullName full name
374 * @return node id
375 */
376 private String getNodeId(String fullName) {
377 int index = StringUtils.lastOrdinalIndexOf(fullName,
378 METRIC_DELIMITER, NUB_OF_DELIMITER);
379 if (index != -1) {
380 return StringUtils.substring(fullName, 0, index);
381 } else {
382 log.warn("Database {} contains malformed node id.", database);
383 return null;
384 }
385 }
Jian Li65f5aa22016-03-22 11:49:42 -0700386}