blob: c911e27b24575b4754806fa4ba057c249fc67e00 [file] [log] [blame]
Jian Li65f5aa22016-03-22 11:49:42 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.influxdbmetrics;
17
18import com.google.common.collect.BiMap;
19import com.google.common.collect.EnumHashBiMap;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.apache.commons.lang.StringUtils;
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.influxdb.InfluxDB;
33import org.influxdb.InfluxDBFactory;
34import org.influxdb.dto.Query;
35import org.influxdb.dto.QueryResult;
36import org.onlab.util.Tools;
37import org.onosproject.cfg.ComponentConfigService;
38import org.onosproject.cluster.NodeId;
39import org.onosproject.core.CoreService;
40import org.osgi.service.component.ComponentContext;
41import org.slf4j.Logger;
42
43import java.util.Dictionary;
44import java.util.List;
45import java.util.Map;
46import java.util.Set;
47import java.util.concurrent.TimeUnit;
48
49import static org.slf4j.LoggerFactory.getLogger;
50
51/**
52 * A Metric retriever implementation for querying metrics from influxDB server.
53 */
54@Component(immediate = true)
55@Service
56public class DefaultInfluxDbMetricsRetriever implements InfluxDbMetricsRetriever {
57
58 private final Logger log = getLogger(getClass());
59
60 private static final String DEFAULT_PROTOCOL = "http";
61 private static final String DEFAULT_ADDRESS = "localhost";
62 private static final int DEFAULT_PORT = 8086;
63 private static final String DEFAULT_DATABASE = "onos";
64 private static final String DEFAULT_USERNAME = "onos";
65 private static final String DEFAULT_PASSWORD = "onos.password";
66 private static final String DEFAULT_POLICY = "default";
67 private static final String COLON_SEPARATOR = ":";
68 private static final String SLASH_SEPARATOR = "//";
69 private static final String BRACKET_START = "[";
70 private static final String BRACKET_END = "]";
71 private static final String METRIC_DELIMITER = ".";
72 private static final int NUB_OF_DELIMITER = 3;
73
74 private static final BiMap<TimeUnit, String> TIME_UNIT_MAP =
75 EnumHashBiMap.create(TimeUnit.class);
76
77 static {
78 // key is TimeUnit enumeration type
79 // value is influx database time unit keyword
80 TIME_UNIT_MAP.put(TimeUnit.DAYS, "d");
81 TIME_UNIT_MAP.put(TimeUnit.HOURS, "h");
82 TIME_UNIT_MAP.put(TimeUnit.MINUTES, "m");
83 TIME_UNIT_MAP.put(TimeUnit.SECONDS, "s");
84 }
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected CoreService coreService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected ComponentConfigService cfgService;
91
92 @Property(name = "address", value = DEFAULT_ADDRESS,
93 label = "IP address of influxDB server; " +
94 "default is localhost")
95 protected String address = DEFAULT_ADDRESS;
96
97 @Property(name = "port", intValue = DEFAULT_PORT,
98 label = "Port number of influxDB server; " +
99 "default is 8086")
100 protected int port = DEFAULT_PORT;
101
102 @Property(name = "database", value = DEFAULT_DATABASE,
103 label = "Database name of influxDB server; " +
104 "default is onos")
105 protected String database = DEFAULT_DATABASE;
106
107 @Property(name = "username", value = DEFAULT_USERNAME,
108 label = "Username of influxDB server; default is onos")
109 protected String username = DEFAULT_USERNAME;
110
111 @Property(name = "password", value = DEFAULT_PASSWORD,
112 label = "Password of influxDB server; default is onos.password")
113 protected String password = DEFAULT_PASSWORD;
114
115 InfluxDB influxDB;
116
117 @Activate
118 public void activate() {
119 cfgService.registerProperties(getClass());
120 coreService.registerApplication("org.onosproject.influxdbmetrics");
121
122 config();
123
124 log.info("Started");
125 }
126
127 @Deactivate
128 public void deactivate() {
129 cfgService.unregisterProperties(getClass(), false);
130
131 log.info("Stopped");
132 }
133
134 @Modified
135 public void modified(ComponentContext context) {
136 readComponentConfiguration(context);
137 config();
138 }
139
140 private void config() {
141 StringBuilder url = new StringBuilder();
142 url.append(DEFAULT_PROTOCOL);
143 url.append(COLON_SEPARATOR + SLASH_SEPARATOR);
144 url.append(address);
145 url.append(COLON_SEPARATOR);
146 url.append(port);
147
148 influxDB = InfluxDBFactory.connect(url.toString(), username, password);
149 }
150
151 @Override
152 public Map<NodeId, Map<String, List<InfluxMetric>>> allMetrics(int period,
153 TimeUnit unit) {
154 Map<NodeId, Set<String>> nameMap = allMetricNames();
155 Map<NodeId, Map<String, List<InfluxMetric>>> metricsMap = Maps.newHashMap();
156
157 nameMap.forEach((nodeId, metricNames) ->
158 metricsMap.putIfAbsent(nodeId, metricsByNodeId(nodeId, period, unit))
159 );
160
161 return metricsMap;
162 }
163
164 @Override
165 public Map<String, List<InfluxMetric>> metricsByNodeId(NodeId nodeId, int period,
166 TimeUnit unit) {
167 Map<NodeId, Set<String>> nameMap = allMetricNames();
168 Map<String, List<InfluxMetric>> map = Maps.newHashMap();
169
170 nameMap.get(nodeId).forEach(metricName -> {
171 List<InfluxMetric> value = metric(nodeId, metricName, period, unit);
172 if (value != null) {
173 map.putIfAbsent(metricName, value);
174 }
175 });
176
177 return map;
178 }
179
180 @Override
181 public Map<NodeId, List<InfluxMetric>> metricsByName(String metricName, int period,
182 TimeUnit unit) {
183 Map<NodeId, List<InfluxMetric>> map = Maps.newHashMap();
184 List<InfluxMetric> metrics = Lists.newArrayList();
185 String queryPrefix = new StringBuilder()
186 .append("SELECT m1_rate FROM")
187 .append(database)
188 .append(METRIC_DELIMITER)
189 .append(quote(DEFAULT_POLICY))
190 .append(METRIC_DELIMITER)
191 .toString();
192 String querySuffix = new StringBuilder()
193 .append(" WHERE time > now() - ")
194 .append(period)
195 .append(unitString(unit))
196 .toString();
197
198 allMetricNames().keySet().forEach(nodeId -> {
199 String queryString = new StringBuilder()
200 .append(queryPrefix)
201 .append(quote(nodeId + METRIC_DELIMITER + metricName))
202 .append(querySuffix)
203 .toString();
204 Query query = new Query(queryString, database);
205 List<QueryResult.Result> results = influxDB.query(query).getResults();
206
207 if (results != null && results.get(0) != null
208 && results.get(0).getSeries() != null) {
209
210 results.get(0).getSeries().get(0).getValues().forEach(value ->
211 metrics.add(new DefaultInfluxMetric.Builder()
212 .time((String) value.get(0))
213 .oneMinRate((Double) value.get(1))
214 .build()));
215 map.putIfAbsent(nodeId, metrics);
216 }
217 });
218
219 return map;
220 }
221
222 @Override
223 public List<InfluxMetric> metric(NodeId nodeId, String metricName,
224 int period, TimeUnit unit) {
225 List<InfluxMetric> metrics = Lists.newArrayList();
226 String queryString = new StringBuilder()
227 .append("SELECT m1_rate FROM ")
228 .append(database)
229 .append(METRIC_DELIMITER)
230 .append(quote(DEFAULT_POLICY))
231 .append(METRIC_DELIMITER)
232 .append(quote(nodeId + METRIC_DELIMITER + metricName))
233 .append(" WHERE time > now() - ")
234 .append(period)
235 .append(unitString(unit))
236 .toString();
237
238 Query query = new Query(queryString, database);
239 List<QueryResult.Result> results = influxDB.query(query).getResults();
240
241 if (results != null && results.get(0) != null
242 && results.get(0).getSeries() != null) {
243
244 results.get(0).getSeries().get(0).getValues().forEach(value ->
245 metrics.add(new DefaultInfluxMetric.Builder()
246 .time((String) value.get(0))
247 .oneMinRate((Double) value.get(1))
248 .build()));
249 return metrics;
250 }
251
252 return null;
253 }
254
255 @Override
256 public Map<NodeId, Map<String, InfluxMetric>> allMetrics() {
257 Map<NodeId, Set<String>> nameMap = allMetricNames();
258 Map<NodeId, Map<String, InfluxMetric>> metricsMap = Maps.newHashMap();
259
260 nameMap.forEach((nodeId, metricNames) ->
261 metricsMap.putIfAbsent(nodeId, metricsByNodeId(nodeId))
262 );
263
264 return metricsMap;
265 }
266
267 @Override
268 public Map<String, InfluxMetric> metricsByNodeId(NodeId nodeId) {
269 Map<NodeId, Set<String>> nameMap = allMetricNames();
270 Map<String, InfluxMetric> map = Maps.newHashMap();
271
272 nameMap.get(nodeId).forEach(metricName -> {
273 InfluxMetric value = metric(nodeId, metricName);
274 if (value != null) {
275 map.putIfAbsent(metricName, value);
276 }
277 });
278
279 return map;
280 }
281
282 @Override
283 public Map<NodeId, InfluxMetric> metricsByName(String metricName) {
284 Map<NodeId, InfluxMetric> map = Maps.newHashMap();
285 String queryPrefix = new StringBuilder()
286 .append("SELECT m1_rate FROM")
287 .append(database)
288 .append(METRIC_DELIMITER)
289 .append(quote(DEFAULT_POLICY))
290 .append(METRIC_DELIMITER)
291 .toString();
292 String querySuffix = new StringBuilder()
293 .append(" LIMIT 1")
294 .toString();
295
296 allMetricNames().keySet().forEach(nodeId -> {
297 String queryString = new StringBuilder()
298 .append(queryPrefix)
299 .append(quote(nodeId + METRIC_DELIMITER + metricName))
300 .append(querySuffix)
301 .toString();
302 Query query = new Query(queryString, database);
303 List<QueryResult.Result> results = influxDB.query(query).getResults();
304
305 if (results != null && results.get(0) != null
306 && results.get(0).getSeries() != null) {
307 InfluxMetric metric = new DefaultInfluxMetric.Builder()
308 .time((String) results.get(0).getSeries().get(0).getValues().get(0).get(0))
309 .oneMinRate((Double) results.get(0).getSeries().get(0)
310 .getValues().get(0).get(1)).build();
311 map.putIfAbsent(nodeId, metric);
312 }
313 });
314
315 return map;
316 }
317
318 @Override
319 public InfluxMetric metric(NodeId nodeId, String metricName) {
320 String queryString = new StringBuilder()
321 .append("SELECT m1_rate FROM ")
322 .append(database)
323 .append(METRIC_DELIMITER)
324 .append(quote(DEFAULT_POLICY))
325 .append(METRIC_DELIMITER)
326 .append(quote(nodeId + METRIC_DELIMITER + metricName))
327 .append(" LIMIT 1")
328 .toString();
329
330 Query query = new Query(queryString, database);
331 List<QueryResult.Result> results = influxDB.query(query).getResults();
332
333 if (results != null && results.get(0) != null
334 && results.get(0).getSeries() != null) {
335 return new DefaultInfluxMetric.Builder()
336 .time((String) results.get(0).getSeries().get(0).getValues().get(0).get(0))
337 .oneMinRate((Double) results.get(0).getSeries().get(0)
338 .getValues().get(0).get(1)).build();
339 }
340
341 return null;
342 }
343
344 private String unitString(TimeUnit unit) {
345 return TIME_UNIT_MAP.get(unit) == null ? "h" : TIME_UNIT_MAP.get(unit);
346 }
347
348 private String quote(String str) {
349 return "\"" + str + "\"";
350 }
351
352 /**
353 * Returns all metric names that bound with node identification.
354 *
355 * @return all metric names
356 */
357 protected Map<NodeId, Set<String>> allMetricNames() {
358 Map<NodeId, Set<String>> metricNameMap = Maps.newHashMap();
359 Query query = new Query("SHOW MEASUREMENTS", database);
360 List<QueryResult.Result> results = influxDB.query(query).getResults();
361 List<List<Object>> rawMetricNames = results.get(0).getSeries().get(0).getValues();
362
363 rawMetricNames.forEach(rawMetricName -> {
364 String nodeIdStr = getNodeId(strip(rawMetricName.toString()));
365
366 if (nodeIdStr != null) {
367 NodeId nodeId = NodeId.nodeId(nodeIdStr);
368 String metricName = getMetricName(strip(rawMetricName.toString()));
369
370 if (!metricNameMap.containsKey(nodeId)) {
371 metricNameMap.putIfAbsent(nodeId, Sets.newHashSet());
372 }
373
374 if (metricName != null) {
375 metricNameMap.get(nodeId).add(metricName);
376 }
377 }
378 });
379
380 return metricNameMap;
381 }
382
383 /**
384 * Strips special bracket from the full name.
385 *
386 * @param fullName full name
387 * @return bracket stripped string
388 */
389 private String strip(String fullName) {
390 return StringUtils.strip(StringUtils.strip(fullName, BRACKET_START), BRACKET_END);
391 }
392
393 /**
394 * Returns metric name from full name.
395 * The elements in full name is split by using '.';
396 * We assume that the metric name always comes after the last three '.'
397 *
398 * @param fullName full name
399 * @return metric name
400 */
401 private String getMetricName(String fullName) {
402 int index = StringUtils.lastOrdinalIndexOf(fullName,
403 METRIC_DELIMITER, NUB_OF_DELIMITER);
404 if (index != -1) {
405 return StringUtils.substring(fullName, index + 1);
406 } else {
407 log.warn("Database {} contains malformed metric name.", database);
408 return null;
409 }
410 }
411
412 /**
413 * Returns node id from full name.
414 * The elements in full name is split by using '.';
415 * We assume that the node id always comes before the last three '.'
416 *
417 * @param fullName full name
418 * @return node id
419 */
420 private String getNodeId(String fullName) {
421 int index = StringUtils.lastOrdinalIndexOf(fullName,
422 METRIC_DELIMITER, NUB_OF_DELIMITER);
423 if (index != -1) {
424 return StringUtils.substring(fullName, 0, index);
425 } else {
426 log.warn("Database {} contains malformed node id.", database);
427 return null;
428 }
429 }
430
431 /**
432 * Extracts properties from the component configuration context.
433 *
434 * @param context the component context
435 */
436 private void readComponentConfiguration(ComponentContext context) {
437 Dictionary<?, ?> properties = context.getProperties();
438
439 String addressStr = Tools.get(properties, "address");
440 address = addressStr != null ? addressStr : DEFAULT_ADDRESS;
441 log.info("Configured. InfluxDB server address is {}", address);
442
443 String databaseStr = Tools.get(properties, "database");
444 database = databaseStr != null ? databaseStr : DEFAULT_DATABASE;
445 log.info("Configured. InfluxDB server database is {}", database);
446
447 String usernameStr = Tools.get(properties, "username");
448 username = usernameStr != null ? usernameStr : DEFAULT_USERNAME;
449 log.info("Configured. InfluxDB server username is {}", username);
450
451 String passwordStr = Tools.get(properties, "password");
452 password = passwordStr != null ? passwordStr : DEFAULT_PASSWORD;
453 log.info("Configured. InfluxDB server password is {}", password);
454
455 Integer portConfigured = Tools.getIntegerProperty(properties, "port");
456 if (portConfigured == null) {
457 port = DEFAULT_PORT;
458 log.info("InfluxDB port is not configured, default value is {}", port);
459 } else {
460 port = portConfigured;
461 log.info("Configured. InfluxDB port is configured to {}", port);
462 }
463 }
464}