blob: 268422cd7563046b05c3f45478f483d91cecde62 [file] [log] [blame]
srikanth116e6e82014-08-19 07:22:37 -07001#
2# Copyright (c) 2013 Big Switch Networks, Inc.
3#
4# Licensed under the Eclipse Public License, Version 1.0 (the
5# "License"); you may not use this file except in compliance with the
6# License. You may obtain a copy of the License at
7#
8# http://www.eclipse.org/legal/epl-v10.html
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied. See the License for the specific language governing
14# permissions and limitations under the License.
15#
16
17from cassandra.ttypes import KsDef, CfDef, InvalidRequestException, TTransport, \
18 SlicePredicate, SliceRange, ColumnParent, ConsistencyLevel, ColumnPath, \
19 Mutation, Deletion, KeyRange, Column, ColumnOrSuperColumn, SuperColumn
20from django.conf import settings
21from .utils import CassandraConnection
22import random
23import datetime
24import time
25from sdncon.controller.config import get_local_controller_id
26
27CONTROLLER_STATS_NAME = 'controller'
28SWITCH_STATS_NAME = 'switch'
29PORT_STATS_NAME = 'port'
30TARGET_INDEX_NAME = 'target_index'
31STATS_TYPE_INDEX_NAME = 'stats_type_index'
32
33STATS_COLUMN_FAMILY_NAME_SUFFIX = '_stats'
34STATS_TARGET_TYPE_PUT_DATA_SUFFIX = '-stats'
35
36STATS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds
37STATS_PADDED_COLUMN_TIME_LENGTH = len(str(STATS_BUCKET_PERIOD))
38
39EVENTS_COLUMN_FAMILY_NAME = 'events'
40EVENTS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds
41EVENTS_PADDED_COLUMN_TIME_LENGTH = len(str(EVENTS_BUCKET_PERIOD))
42
43THIRTY_SECOND_INTERVAL = 30 * 1000
44ONE_MINUTE_INTERVAL = 60 * 1000
45FIVE_MINUTE_INTERVAL = 5 * ONE_MINUTE_INTERVAL
46TEN_MINUTE_INTERVAL = 10 * ONE_MINUTE_INTERVAL
47ONE_HOUR_INTERVAL = 60 * ONE_MINUTE_INTERVAL
48FOUR_HOUR_INTERVAL = 4 * ONE_HOUR_INTERVAL
49ONE_DAY_INTERVAL = 24 * ONE_HOUR_INTERVAL
50ONE_WEEK_INTERVAL = 7 * ONE_DAY_INTERVAL
51FOUR_WEEK_INTERVAL = 4 * ONE_WEEK_INTERVAL
52
53DOWNSAMPLE_INTERVALS = (ONE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL,
54 ONE_HOUR_INTERVAL, FOUR_HOUR_INTERVAL,
55 ONE_DAY_INTERVAL, ONE_WEEK_INTERVAL,
56 FOUR_WEEK_INTERVAL)
57
58WINDOW_INTERVALS = (THIRTY_SECOND_INTERVAL, ONE_MINUTE_INTERVAL,
59 FIVE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL)
60
61VALUE_DATA_FORMAT = 'value'
62RATE_DATA_FORMAT = 'rate'
63
64
65class StatsException(Exception):
66 pass
67
68
69class StatsInvalidStatsDataException(StatsException):
70 def __init__(self):
71 super(StatsInvalidStatsDataException,self).__init__(
72 'Error adding stats data with incorrect format')
73
74
75class StatsDatabaseConnectionException(StatsException):
76 def __init__(self):
77 super(StatsDatabaseConnectionException,self).__init__(
78 'Error connecting to stats database')
79
80
81class StatsDatabaseAccessException(StatsException):
82 def __init__(self):
83 super(StatsDatabaseAccessException,self).__init__(
84 'Error accessing stats database')
85
86class StatsNonnumericValueException(StatsException):
87 def __init__(self, value):
88 super(StatsNonnumericValueException,self).__init__(
89 'Invalid non-numeric stat value for rate or '
90 'average value computation: ' + str(value))
91
92
93class StatsRateComputationException(StatsException):
94 def __init__(self):
95 super(StatsRateComputationException,self).__init__(
96 'Error computing rate; not enough raw data')
97
98
99class StatsInvalidDataFormatException(StatsException):
100 def __init__(self, data_format):
101 super(StatsInvalidDataFormatException,self).__init__(
102 'Invalid data format: ' + str(data_format))
103
104
105class StatsInvalidStatsTimeRangeException(StatsException):
106 def __init__(self, start_time, end_time):
107 super(StatsInvalidStatsTimeRangeException,self).__init__(
108 'Invalid stats time range; start = %s; end = %s' %
109 (str(start_time), str(end_time)))
110
111
112class StatsInvalidStatsTypeException(StatsException):
113 def __init__(self, stats_type):
114 super(StatsInvalidStatsTypeException,self).__init__(
115 'Invalid stats type; name = %s' % str(stats_type))
116
117
118class StatsInvalidStatsMetadataException(StatsException):
119 def __init__(self, file_name):
120 super(StatsInvalidStatsMetadataException,self).__init__(
121 'Invalid stats metadata from file \"%s\"' % str(file_name))
122
123
124class StatsInternalException(StatsException):
125 def __init__(self, message):
126 super(StatsInternalException,self).__init__(
127 'Stats internal error: \"%s\"' % str(message))
128
129class StatsCreateColumnFamilyException(StatsException):
130 def __init__(self, name):
131 super(StatsCreateColumnFamilyException,self).__init__(
132 'Error creating column family; name = ' % name)
133
134# The following code is a hack to get the stats code to use a freshly
135# created test keyspace when we're running the unit tests. I'm guessing
136# there must be some way to detect if we're running under the Django
137# (or Python) unit test framework, but I couldn't find any info about
138# how to do that. So for now, we just provide this function that the
139# unit tests must call before they make any stats call to get the stats
140# code to use the test keyspace instead of the normal production
141# keyspace. Bother.
142
143use_test_keyspace = False
144
145def set_use_test_keyspace():
146 global use_test_keyspace
147 use_test_keyspace = True
148
149
150stats_db_connection = None
151
152# FIXME: Should ideally create the column families on demand as the data
153# is added, so the different target types don't need to be predefined here.
154# Probably not a big problem for right now, though.
155COLUMN_FAMILY_INFO_LIST = (
156 {'name': TARGET_INDEX_NAME,
157 'column_type': 'Super',
158 'comparator_type': 'UTF8Type',
159 'subcomparator_type': 'UTF8Type'},
160 {'name': STATS_TYPE_INDEX_NAME,
161 'column_type': 'Super',
162 'comparator_type': 'UTF8Type',
163 'subcomparator_type': 'UTF8Type'},
164 {'name': CONTROLLER_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
165 'comparator_type': 'UTF8Type'},
166 {'name': SWITCH_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
167 'comparator_type': 'UTF8Type'},
168 {'name': PORT_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
169 'comparator_type': 'UTF8Type'},
170 {'name': EVENTS_COLUMN_FAMILY_NAME,
171 'column_type': 'Super',
172 'comparator_type': 'UTF8Type',
173 'subcomparator_type': 'UTF8Type'},
174)
175
176
177def init_stats_db_connection(host, port, keyspace, user, password,
178 replication_factor, column_family_def_default_settings):
179 global stats_db_connection
180 if not stats_db_connection:
181 if use_test_keyspace:
182 keyspace = "test_" + keyspace
183
184 try:
185 stats_db_connection = CassandraConnection(host, port, keyspace, user, password)
186 stats_db_connection.connect()
187 except Exception:
188 stats_db_connection = None
189 raise StatsException("Error connecting to Cassandra daemon")
190
191 if use_test_keyspace:
192 try:
193 stats_db_connection.get_client().system_drop_keyspace(keyspace)
194 except Exception:
195 pass
196
197 try:
198 stats_db_connection.set_keyspace()
199 create_keyspace = False
200 except Exception:
201 create_keyspace = True
202
203 if create_keyspace:
204 keyspace_def = KsDef(name=keyspace,
205 strategy_class='org.apache.cassandra.locator.SimpleStrategy',
206 replication_factor=replication_factor,
207 cf_defs=[])
208 try:
209 stats_db_connection.get_client().system_add_keyspace(keyspace_def)
210 stats_db_connection.set_keyspace()
211 except Exception, _e:
212 stats_db_connection = None
213 raise StatsException("Error creating stats keyspace")
214
215 for column_family_info in COLUMN_FAMILY_INFO_LIST:
216 try:
217 column_family_def_settings = column_family_def_default_settings.copy()
218 column_family_def_settings.update(column_family_info)
219 column_family_def_settings['keyspace'] = keyspace
220 # pylint: disable=W0142
221 stats_db_connection.get_client().system_add_column_family(
222 CfDef(**column_family_def_settings))
223 except InvalidRequestException, _e:
224 # Assume this is because the column family already exists.
225 # FIXME. Could check exception message for specific string
226 pass
227 except Exception, _e:
228 stats_db_connection = None
229 raise StatsCreateColumnFamilyException(column_family_info.get('name'))
230
231
232def get_stats_db_connection():
233 return stats_db_connection
234
235
236# The following function is mainly intended to be used by the unit tests. It lets
237# you clear out all of the data from the database. Note that since the stats DB
238# is not managed by the normal Django DB mechanism you don't get the automatic
239# DB flushing from the Django TestCase code, so it has to be done explicitly.
240# There's a StatsTestCase subclass of TestCase in the stats unit tests that
241# implements the tearDown method to call flush_stats_db after each test.
242def flush_stats_db():
243 if stats_db_connection is not None:
244 for column_family_info in COLUMN_FAMILY_INFO_LIST:
245 stats_db_connection.get_client().truncate(column_family_info['name'])
246
247def call_cassandra_with_reconnect(fn, *args, **kwargs):
248 try:
249 try:
250 results = fn(*args, **kwargs)
251 except TTransport.TTransportException:
252 stats_db_connection.reconnect()
253 results = fn(*args, **kwargs)
254 except TTransport.TTransportException, _e:
255 raise StatsDatabaseConnectionException()
256 except Exception, _e:
257 raise StatsDatabaseAccessException()
258
259 return results
260
261
262def get_stats_padded_column_part(column_part):
263 """
264 For the columns to be sorted correctly by time we need to pad with
265 leading zeroes up to the maximum range of the bucket
266 """
267 column_part = str(column_part)
268 leading_zeroes = ('0'*(STATS_PADDED_COLUMN_TIME_LENGTH-len(column_part)))
269 column_part = leading_zeroes + column_part
270 return column_part
271
272
273def split_stats_timestamp(timestamp):
274 key_part = timestamp / STATS_BUCKET_PERIOD
275 column_part = timestamp % STATS_BUCKET_PERIOD
276 return (key_part, column_part)
277
278
279def construct_stats_key(cluster, target_id, stats_type, timestamp_key_part):
280 """
281 Constructs the keys for the controller or switch stats.
282 For the controller stats the target_id is the controller node id.
283 For switch stats the target_id is the dpid of the switch.
284 """
285 return cluster + '|' + target_id + '|' + stats_type + '|' + str(timestamp_key_part)
286
287
288def append_stats_results(get_results, values, timestamp_key_part):
289 shifted_timestamp_key_part = int(timestamp_key_part) * STATS_BUCKET_PERIOD
290 for item in get_results:
291 timestamp_column_part = int(item.column.name)
292 value = item.column.value
293 timestamp = shifted_timestamp_key_part + timestamp_column_part
294 values.append((timestamp, value))
295
296
297def get_stats_slice_predicate(column_start, column_end):
298 if column_start != '':
299 column_start = get_stats_padded_column_part(column_start)
300 if column_end != '':
301 column_end = get_stats_padded_column_part(column_end)
302 slice_predicate = SlicePredicate(slice_range=SliceRange(
303 start=column_start, finish=column_end, count=1000000))
304 return slice_predicate
305
306
307def check_time_range(start_time, end_time):
308 if int(end_time) < int(start_time):
309 raise StatsInvalidStatsTimeRangeException(start_time, end_time)
310
311
312def check_valid_data_format(data_format):
313 if data_format != VALUE_DATA_FORMAT and data_format != RATE_DATA_FORMAT:
314 raise StatsInvalidDataFormatException(data_format)
315
316
317def get_window_range(raw_stats_values, index, window):
318
319 if window == 0:
320 return (index, index)
321
322 # Get start index
323 timestamp = raw_stats_values[index][0]
324 start_timestamp = timestamp - (window / 2)
325 end_timestamp = timestamp + (window / 2)
326
327 start_index = index
328 while start_index > 0:
329 next_timestamp = raw_stats_values[start_index - 1][0]
330 if next_timestamp < start_timestamp:
331 break
332 start_index -= 1
333
334 end_index = index
335 while end_index < len(raw_stats_values) - 1:
336 next_timestamp = raw_stats_values[end_index + 1][0]
337 if next_timestamp > end_timestamp:
338 break
339 end_index += 1
340
341 return (start_index, end_index)
342
343
344def convert_stat_string_to_value(stat_string):
345 try:
346 stat_value = int(stat_string)
347 except ValueError:
348 try:
349 stat_value = float(stat_string)
350 except ValueError:
351 stat_value = stat_string
352 return stat_value
353
354
355def get_rate_over_stats_values(stats_values):
356
357 if len(stats_values) < 2:
358 return None
359
360 start_stat = stats_values[0]
361 end_stat = stats_values[-1]
362
363 timestamp_delta = end_stat[0] - start_stat[0]
364 # NOTE: In computing the value_delta here it's safe to assume floats
365 # rather than calling convert_stat_string_to_value because we're going
366 # to be converting to float anyway when we do the rate calculation later.
367 # So there's no point in trying to differentiate between int and float
368 # and rate doesn't make sense for any other type of stat data (e.g. string)
369 value_delta = float(end_stat[1]) - float(start_stat[1])
370 if timestamp_delta == 0:
371 rate = float('inf' if value_delta > 0 else '-inf')
372 else:
373 rate = value_delta / timestamp_delta
374
375 return rate
376
377
378def get_rate_over_window(raw_stats_values, index, window):
379 if len(raw_stats_values) < 2:
380 return None
381
382 if window == 0:
383 if index == 0:
384 start_index = 0
385 end_index = 1
386 else:
387 start_index = index - 1
388 end_index = index
389 else:
390 start_index, end_index = get_window_range(raw_stats_values, index, window)
391
392 return get_rate_over_stats_values(raw_stats_values[start_index:end_index + 1])
393
394
395def get_average_over_stats_values(stats_values):
396
397 total = 0
398 count = 0
399 for stat_value in stats_values:
400 # FIXME: Should we just always convert to float here?
401 # This would give a more accurate result for the average calculation
402 # but would mean that the data type is different for a
403 # zero vs. non-zero window size.
404 value = convert_stat_string_to_value(stat_value[1])
405 if type(value) not in (int,float):
406 raise StatsNonnumericValueException(value)
407 total += value
408 count += 1
409
410 return (total / count) if count > 0 else None
411
412
413def get_average_value_over_window(raw_stats_values, index, window):
414 start_index, end_index = get_window_range(raw_stats_values, index, window)
415 stats_values = raw_stats_values[start_index:end_index + 1]
416 return get_average_over_stats_values(stats_values)
417
418
419def reverse_stats_data_generator(cluster, target_type, target_id, stats_type,
420 start_time=None, end_time=None,
421 chunk_interval=3600000):
422 if start_time is None:
423 start_time = int(time.time() * 1000)
424 if end_time is None:
425 # By default, don't go back past 1/1/2011. This was before we had stats support
426 # in the controller, so we shouldn't have any data earlier than that (except if
427 # the clock on the controller was set incorrectly).
428 end_time = int(time.mktime(datetime.datetime(2011,1,1).timetuple()) * 1000)
429 end_key_part, _end_column_part = split_stats_timestamp(end_time)
430 key_part, column_part = split_stats_timestamp(start_time)
431 column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
432 column_parent = ColumnParent(column_family)
433 # FIXME: Should add support for chunk_interval to be either iterable or a
434 # list/tuple to give a sequence of chunk intervals to use. The last available
435 # chunk interval from the list/tuple/iterator would then be used for any
436 # subsequent cassandra calls
437 #chunk_interval_iter = (chunk_interval if isinstance(chunk_interval, list) or
438 # isinstance(chunk_interval, tuple) else [chunk_interval])
439 while key_part >= 0:
440 key = construct_stats_key(cluster, target_id, stats_type, key_part)
441
442 while True:
443 column_start = column_part - chunk_interval
444 if column_start < 0:
445 column_start = 0
446 slice_predicate = get_stats_slice_predicate(column_start, column_part)
447 for attempt in (1,2):
448 try:
449 get_results = stats_db_connection.get_client().get_slice(key,
450 column_parent, slice_predicate, ConsistencyLevel.ONE)
451 for item in reversed(get_results):
452 timestamp = (key_part * STATS_BUCKET_PERIOD) + int(item.column.name)
453 value = item.column.value
454 yield (timestamp, value)
455 break
456 except TTransport.TTransportException:
457 # Only retry once, so if it's the second time through,
458 # propagate the exception
459 if attempt == 2:
460 raise StatsDatabaseConnectionException()
461 stats_db_connection.reconnect()
462 except Exception:
463 raise StatsDatabaseAccessException()
464
465 column_part = column_start
466 if column_part == 0:
467 break
468
469 if key_part == end_key_part:
470 break
471 key_part -= 1
472 column_part = STATS_BUCKET_PERIOD - 1
473
474
475def get_latest_stat_data(cluster, target_type, target_id, stats_type,
476 window=0, data_format=VALUE_DATA_FORMAT):
477
478 check_valid_data_format(data_format)
479
480 minimum_data_points = 2 if data_format == RATE_DATA_FORMAT else 1
481 stats_data_window = []
482 latest_stat_timestamp = None
483
484 start_time = int(time.time() * 1000)
485 # Limit how far back we'll look for the latest stat value.
486 # 86400000 is 1 day in ms
487 end_time = start_time - 86400000
488 for stat_data_point in reverse_stats_data_generator(cluster,
489 target_type, target_id, stats_type, start_time, end_time):
490 current_stat_timestamp = stat_data_point[0]
491 if latest_stat_timestamp is None:
492 latest_stat_timestamp = current_stat_timestamp
493
494 # NOTE: For stats operations we treat the window for the rate or
495 # average calculation to be centered around the current timestamp.
496 # For the latest stat case there is no data after the current point.
497 # We could extend the window back further so that the current timestamp
498 # is the end of the window range instead of the middle, but then that
499 # would be inconsistent with the other cases, so instead we just go
500 # back to half the window size. I haven't been able to convince myself
501 # strongly one way or the other which is better (or how much it matters)
502 outside_window = (latest_stat_timestamp - current_stat_timestamp) > (window / 2)
503 if len(stats_data_window) >= minimum_data_points and outside_window:
504 break
505
506 stats_data_window.insert(0, stat_data_point)
507
508 if (window == 0) and (len(stats_data_window) >= minimum_data_points):
509 break
510
511 stat_data_point = None
512
513 if latest_stat_timestamp is not None:
514 if data_format == VALUE_DATA_FORMAT:
515 value = get_average_over_stats_values(stats_data_window)
516 else:
517 assert data_format == RATE_DATA_FORMAT, "Invalid data format"
518 value = get_rate_over_stats_values(stats_data_window)
519 if value is not None:
520 stat_data_point = (latest_stat_timestamp, value)
521
522 return stat_data_point
523
524
525def get_stats_data(cluster, target_type, target_id, stats_type,
526 start_time, end_time, sample_interval=0, window=0,
527 data_format=VALUE_DATA_FORMAT, limit=None):
528
529 check_time_range(start_time, end_time)
530 check_valid_data_format(data_format)
531 # FIXME: Add validation of other arguments
532
533 start_key_part, start_column_part = split_stats_timestamp(int(start_time))
534 end_key_part, end_column_part = split_stats_timestamp(int(end_time))
535
536 raw_stats_values = []
537 column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
538 column_parent = ColumnParent(column_family)
539
540 for key_part in range(start_key_part, end_key_part+1):
541 current_start = start_column_part if key_part == start_key_part else ''
542 current_end = end_column_part if key_part == end_key_part else ''
543 # FIXME: How big can the count be?
544 slice_predicate = get_stats_slice_predicate(current_start, current_end)
545 key = construct_stats_key(cluster, target_id, stats_type, key_part)
546 for attempt in (1,2):
547 try:
548 get_results = stats_db_connection.get_client().get_slice(key,
549 column_parent, slice_predicate, ConsistencyLevel.ONE)
550 break
551 except TTransport.TTransportException:
552 # Only retry once, so if it's the second time through,
553 # propagate the exception
554 if attempt == 2:
555 raise StatsDatabaseConnectionException()
556 stats_db_connection.reconnect()
557 except Exception:
558 raise StatsDatabaseAccessException()
559
560 append_stats_results(get_results, raw_stats_values, key_part)
561
562 # FIXME: This logic to handle the limit argument isn't complete.
563 # It doesn't account for a non-zero window or dpwnsampling.
564 if (limit != None and sample_interval == 0 and window == 0 and
565 len(raw_stats_values) > limit):
566 raw_stats_values = raw_stats_values[:limit]
567 break
568
569 stats_values = []
570 last_downsample_index = None
571 for i in range(0, len(raw_stats_values)):
572 # Handle downsampling
573 if sample_interval != 0:
574 downsample_index = raw_stats_values[i][0] / sample_interval
575 if downsample_index == last_downsample_index:
576 continue
577 last_downsample_index = downsample_index
578
579 # Get the value based for the specified data format
580 if data_format == VALUE_DATA_FORMAT:
581 if sample_interval == 0:
582 value = convert_stat_string_to_value(raw_stats_values[i][1])
583 else:
584 value = get_average_value_over_window(raw_stats_values, i, window)
585 else:
586 assert data_format == RATE_DATA_FORMAT, "Invalid data format"
587 value = get_rate_over_window(raw_stats_values, i, window)
588
589 if value is not None:
590 stats_values.append((raw_stats_values[i][0], value))
591
592 return stats_values
593
594
595def delete_stats_data(cluster, target_type, target_id, stats_type,
596 start_time, end_time):
597
598 check_time_range(start_time, end_time)
599 # FIXME: Add validation of other arguments
600
601 start_key_part, start_column_part = split_stats_timestamp(int(start_time))
602 end_key_part, end_column_part = split_stats_timestamp(int(end_time))
603
604 column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
605 column_parent = ColumnParent(column_family)
606 # The Cassandra timestamps are in microseconds, not milliseconds,
607 # so we convert to microseconds. The Cassandra timestamp is derived
608 # from the stat timestamp (i.e. same time converted to microseconds),
609 # so we use the end_time + 1, since that's guaranteed to be greater
610 # than any of the timestamps for the sample points we're deleting.
611 timestamp = (int(end_time) * 1000) + 1
612 for key_part in range(start_key_part, end_key_part+1):
613 key = construct_stats_key(cluster, target_id, stats_type, key_part)
614 current_start = start_column_part if key_part == start_key_part else ''
615 current_end = end_column_part if key_part == end_key_part else ''
616 if current_start == '' and current_end == '':
617 call_cassandra_with_reconnect(stats_db_connection.get_client().remove,
618 key, ColumnPath(column_family=column_family), timestamp,
619 ConsistencyLevel.ONE)
620 else:
621 # grrr. Cassandra currently doesn't support doing deletions via a
622 # slice range (i.e. a column start and end). You need to give it a
623 # list of columns. So we do a get_slice with the slice range and then
624 # extract the individual column names from the result of that and
625 # build up the column list that we can use to delete the column
626 # using batch_mutate.
627 slice_predicate = get_stats_slice_predicate(current_start, current_end)
628 get_results = call_cassandra_with_reconnect(
629 stats_db_connection.get_client().get_slice,
630 key, column_parent, slice_predicate, ConsistencyLevel.ONE)
631 column_names = []
632 for item in get_results:
633 column_names.append(item.column.name)
634
635 deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names))
636 mutation_map = {key: {column_family: [Mutation(deletion=deletion)]}}
637 call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
638 mutation_map, ConsistencyLevel.ONE)
639
640
641STATS_METADATA_VARIABLE_NAME = 'STATS_METADATA'
642
643stats_metadata = None
644
645def init_stats_metadata(_cluster):
646 """
647 Initialize the dictionary of stats metadata. Currently this is initialized
648 from a directory of metadata files that contain the metadata. Currently,
649 there is no differentiation in the stats types that are supported across
650 clusters, so we ignore the cluster argument and we just maintain a
651 global map of stat type metadata.
652 """
653 global stats_metadata
654 if not stats_metadata:
655 stats_metadata = {}
656 for module_name in settings.STATS_METADATA_MODULES:
657 metadata_module = __import__(module_name,
658 fromlist=[STATS_METADATA_VARIABLE_NAME])
659 if not metadata_module:
660 # FIXME: log error
661 continue
662
663 if STATS_METADATA_VARIABLE_NAME not in dir(metadata_module):
664 # FIXME: log error
665 continue
666
667 metadata_list = getattr(metadata_module, STATS_METADATA_VARIABLE_NAME)
668
669 if type(metadata_list) is dict:
670 metadata_list = [metadata_list]
671
672 if type(metadata_list) is not list and type(metadata_list) is not tuple:
673 raise StatsInvalidStatsMetadataException(module_name)
674
675 for metadata in metadata_list:
676 if type(metadata) is not dict:
677 raise StatsInvalidStatsMetadataException(module_name)
678
679 name = metadata.get('name')
680 if not name:
681 raise StatsInvalidStatsMetadataException(module_name)
682 name = str(name)
683
684 # Auto-set the verbose_name to the name if it's not set explicitly
685 verbose_name = metadata.get('verbose_name')
686 if not verbose_name:
687 metadata['verbose_name'] = name
688
689 # FIXME: Validate other contents of metadata.
690 # e.g. flag name conflicts between files.
691
692 stats_metadata[name] = metadata
693
694def get_stats_metadata(cluster, stats_type=None):
695 init_stats_metadata(cluster)
696 # If no stat_type is specified return the entire dictionary of stat types
697 metadata = stats_metadata.get(stats_type) if stats_type else stats_metadata
698 if metadata is None:
699 raise StatsInvalidStatsTypeException(stats_type)
700 return metadata
701
702
703STATS_INDEX_ATTRIBUTE_TYPES = {
704 'last-updated': int
705}
706def stats_type_slice_to_index_data(stats_type_slice):
707 index_data = {}
708 for super_column in stats_type_slice:
709 name = super_column.super_column.name
710 column_list = super_column.super_column.columns
711 if name == 'base':
712 insert_map = index_data
713 elif name.startswith('param:'):
714 colon_index = name.find(':')
715 parameter_name = name[colon_index+1:]
716 parameter_map = index_data.get('parameters')
717 if not parameter_map:
718 parameter_map = {}
719 index_data['parameters'] = parameter_map
720 insert_map = {}
721 parameter_map[parameter_name] = insert_map
722 else:
723 raise StatsInternalException('Invalid stats type index name: ' + str(name))
724
725 for column in column_list:
726 value = column.value
727 attribute_type = STATS_INDEX_ATTRIBUTE_TYPES.get(column.name)
728 if attribute_type is not None:
729 value = attribute_type(value)
730 insert_map[column.name] = value
731
732 return index_data
733
734
735def get_stats_type_index(cluster, target_type, target_id, stats_type=None):
736 column_parent = ColumnParent(STATS_TYPE_INDEX_NAME)
737 slice_predicate = SlicePredicate(slice_range=SliceRange(
738 start='', finish='', count=1000000))
739 key_prefix = cluster + ':' + target_type + ':' + target_id
740 if stats_type is None:
741 key_range = KeyRange(start_key=key_prefix+':', end_key=key_prefix+';', count=100000)
742 key_slice_list = call_cassandra_with_reconnect(
743 stats_db_connection.get_client().get_range_slices,
744 column_parent, slice_predicate, key_range, ConsistencyLevel.ONE)
745 stats_index_data = {}
746 for key_slice in key_slice_list:
747 key = key_slice.key
748 colon_index = key.rfind(':')
749 if colon_index < 0:
750 raise StatsInternalException('Invalid stats type index key: ' + str(key))
751 stats_type = key[colon_index+1:]
752 stats_index_data[stats_type] = stats_type_slice_to_index_data(key_slice.columns)
753 else:
754 key = key_prefix + ':' + stats_type
755 stats_type_slice = call_cassandra_with_reconnect(
756 stats_db_connection.get_client().get_slice, key, column_parent,
757 slice_predicate, ConsistencyLevel.ONE)
758 stats_index_data = stats_type_slice_to_index_data(stats_type_slice)
759
760 return stats_index_data
761
762
763def get_stats_target_types(cluster):
764 column_parent = ColumnParent(TARGET_INDEX_NAME)
765 slice_predicate = SlicePredicate(column_names=[])
766 key_range = KeyRange(start_key=cluster+':', end_key=cluster+';', count=100000)
767 key_slice_list = call_cassandra_with_reconnect(
768 stats_db_connection.get_client().get_range_slices,
769 column_parent, slice_predicate, key_range, ConsistencyLevel.ONE)
770
771 target_types = {}
772 for key_slice in key_slice_list:
773 target_type = key_slice.key[len(cluster)+1:]
774
775 target_types[target_type] = {}
776
777 return target_types
778
779
780STATS_TARGET_ATTRIBUTE_TYPES = {
781 'last-updated': int
782}
783
784def get_stats_targets(cluster, target_type):
785 key = cluster + ':' + target_type
786 column_parent = ColumnParent(TARGET_INDEX_NAME)
787 slice_predicate = SlicePredicate(slice_range=SliceRange(
788 start='', finish='', count=1000000))
789 super_column_list = call_cassandra_with_reconnect(
790 stats_db_connection.get_client().get_slice, key, column_parent,
791 slice_predicate, ConsistencyLevel.ONE)
792 target_list = {}
793 for item in super_column_list:
794 target = {}
795 for column in item.super_column.columns:
796 value = column.value
797 attribute_type = STATS_TARGET_ATTRIBUTE_TYPES.get(column.name)
798 if attribute_type is not None:
799 value = attribute_type(value)
800 target[column.name] = value
801 target_list[item.super_column.name] = target
802
803 return target_list
804
805
806# FIXME: Should update the code below to use these constants
807# instead of string literals
808LAST_UPDATED_ATTRIBUTE_NAME = 'last-updated'
809CONTROLLER_ATTRIBUTE_NAME = 'controller'
810BASE_SUPER_COLUMN_NAME = 'base'
811PARAMETERS_SUPER_COLUMN_NAME = 'parameters'
812PARAM_SUPER_COLUMN_NAME_PREFIX = 'param:'
813
814def append_attributes_to_mutation_list(attributes, supercolumn_name, mutation_list):
815 column_list = []
816 for name, info in attributes.iteritems():
817 timestamp, value = info
818 column = Column(name=name, value=str(value), timestamp=timestamp*1000)
819 column_list.append(column)
820 mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
821 super_column=SuperColumn(name=supercolumn_name, columns=column_list)))
822 mutation_list.append(mutation)
823
824
825def add_stat_type_index_info_to_mutation_map(cluster, target_type,
826 stats_type_index, mutation_map):
827 for key, stats_type_info in stats_type_index.iteritems():
828 separator_index = key.find(':')
829 assert separator_index >= 0
830 base_stat_type_name = key[:separator_index]
831 target_id = key[separator_index + 1:]
832 stats_type_base_attributes, stats_type_params = stats_type_info
833 mutation_list = []
834 append_attributes_to_mutation_list(stats_type_base_attributes,
835 'base', mutation_list)
836 for name, attributes in stats_type_params.iteritems():
837 append_attributes_to_mutation_list(attributes,
838 'param:' + name, mutation_list)
839 mutation_key = cluster + ':' + target_type + ':' + target_id + ':' + base_stat_type_name
840 mutation_map[mutation_key] = {STATS_TYPE_INDEX_NAME: mutation_list}
841
842
843def add_target_id_list_to_mutation_map(cluster, target_type,
844 target_id_list, mutation_map):
845 mutation_list = []
846 for target_id, attributes in target_id_list:
847 append_attributes_to_mutation_list(attributes, target_id, mutation_list)
848 key = cluster + ':' + target_type
849 mutation_map[key] = {TARGET_INDEX_NAME: mutation_list}
850
851
852def _put_stats_data(cluster, target_type, stats_data):
853 try:
854 controller_id = get_local_controller_id()
855 mutation_map = {}
856 target_id_list = []
857 stats_type_index = {}
858 column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
859 for (target_id, target_id_stats) in stats_data.iteritems():
860 # Map 'localhost' controller to the actual ID for the local controller
861 # FIXME: Eventually we should fix up the other components (e.g. statd)
862 # that invoke this REST API to not use localhost and instead use the
863 # REST API to obtain the real ID for the local controller, but for now
864 # this works to ensure we're not using localhost in any of the data we
865 # store in the DB (unless, of course, the uuid version of the controller
866 # ID hasn't been written to the boot-config file, in which case it will
867 # default to the old localhost value).
868 if target_type == 'controller' and target_id == 'localhost':
869 target_id = controller_id
870 latest_id_timestamp = None
871 for (stats_type, stats_data_array) in target_id_stats.iteritems():
872 # Check if it's a parameterized type and extract the base
873 # stat type and parameter name.
874 parameter_separator = stats_type.find('__')
875 if parameter_separator >= 0:
876 stats_type_base = stats_type[:parameter_separator]
877 stats_type_parameter = stats_type[parameter_separator+2:]
878 else:
879 stats_type_base = stats_type
880 stats_type_parameter = None
881
882 latest_stat_type_timestamp = None
883
884 # Add the stats values to the mutation map
885 for stats_value in stats_data_array:
886 timestamp = int(stats_value['timestamp'])
887 if latest_stat_type_timestamp is None or timestamp > latest_stat_type_timestamp:
888 latest_stat_type_timestamp = timestamp
889 if latest_id_timestamp is None or timestamp > latest_id_timestamp:
890 latest_id_timestamp = timestamp
891 value = stats_value['value']
892 timestamp_key_part, timestamp_column_part = split_stats_timestamp(timestamp)
893 key = construct_stats_key(cluster, target_id, stats_type, timestamp_key_part)
894 key_entry = mutation_map.get(key)
895 if not key_entry:
896 mutation_list = []
897 mutation_map[key] = {column_family: mutation_list}
898 else:
899 mutation_list = key_entry[column_family]
900
901 # Note: convert the Cassandra timestamp value to microseconds to
902 # be consistent with standard Cassandra timestamp format.
903 mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
904 column=Column(name=get_stats_padded_column_part(timestamp_column_part),
905 value=str(value), timestamp=timestamp*1000)))
906 mutation_list.append(mutation)
907
908 # Update the stat type index info.
909 # There can be multiple parameterized types for each base stats type,
910 # so we need to be careful about checking for existing data for
911 # the index_entry. Because of the dictionary nature of the put data
912 # and the way this is serialized into a Python dictionary, though,
913 # we are guaranteed that there won't be multiple entries for a
914 # specific parameters stats type or the base stats type, so we don't
915 # need to handle duplicates for those.
916 if latest_stat_type_timestamp is not None:
917 stats_type_index_key = stats_type_base + ':' + target_id
918 stats_type_info = stats_type_index.get(stats_type_index_key)
919 if not stats_type_info:
920 # This is a tuple of two dictionaries: the attributes for
921 # the base stat type and a dictionary of the parameterized
922 # types that have been seen for that stat type. The
923 # parameterized type dictionary is keyed by the name of
924 # the parameterized type and the value is the associated
925 # attribute dictionary.
926 stats_type_info = ({},{})
927 stats_type_index[stats_type_index_key] = stats_type_info
928 stats_type_base_attributes, stats_type_params = stats_type_info
929 if stats_type_parameter is None:
930 attributes = stats_type_base_attributes
931 else:
932 attributes = stats_type_params.get(stats_type_parameter)
933 if attributes is None:
934 attributes = {}
935 stats_type_params[stats_type_parameter] = attributes
936 last_updated_entry = attributes.get('last-updated')
937 if last_updated_entry is None or latest_stat_type_timestamp > last_updated_entry[0]:
938 attributes['last-updated'] = (latest_stat_type_timestamp, latest_stat_type_timestamp)
939
940 # Update the target index
941 if latest_id_timestamp is not None:
942 # FIXME: Always set the controller attributes for now.
943 # This could/should be optimized to not set this for stats
944 # whose target type is 'controller' since those will
945 # always be coming from the same controller (i.e. itself).
946 # But that change requires some changes (albeit minor) to
947 # syncd to work correctly which I don't want to mess with
948 # right now.
949 #attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp)}
950 #if target_type != 'controller':
951 # attributes['controller'] = controller_id
952 attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp),
953 'controller': (latest_id_timestamp, controller_id)}
954 target_id_list.append((target_id, attributes))
955 except Exception, _e:
956 raise StatsInvalidStatsDataException()
957
958 add_stat_type_index_info_to_mutation_map(cluster, target_type, stats_type_index, mutation_map)
959 add_target_id_list_to_mutation_map(cluster, target_type, target_id_list, mutation_map)
960
961 call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
962 mutation_map, ConsistencyLevel.ONE)
963
964
965def put_stats_data(cluster, stats_data):
966 for target_type, target_stats_data in stats_data.items():
967 if target_type.endswith(STATS_TARGET_TYPE_PUT_DATA_SUFFIX):
968 # Strip off the '-stats' suffix
969 target_type = target_type[:-len(STATS_TARGET_TYPE_PUT_DATA_SUFFIX)]
970 _put_stats_data(cluster, target_type, target_stats_data)
971
972
973def get_events_padded_column_part(column_part):
974 """
975 For the columns to be sorted correctly by time we need to pad with
976 leading zeroes up to the maximum range of the bucket
977 """
978 column_part = str(column_part)
979 leading_zeroes = ('0'*(EVENTS_PADDED_COLUMN_TIME_LENGTH-len(column_part)))
980 column_part = leading_zeroes + column_part
981 return column_part
982
983
984def split_events_timestamp(timestamp):
985 key_part = timestamp / EVENTS_BUCKET_PERIOD
986 column_part = timestamp % EVENTS_BUCKET_PERIOD
987 return (key_part, column_part)
988
989
990def construct_log_events_key(cluster, node_id, timestamp_key_part):
991 return cluster + '|' + node_id + '|' + str(timestamp_key_part)
992
993
994def get_events_slice_predicate(column_start, column_end):
995 if column_start != '':
996 column_start = get_events_padded_column_part(column_start)
997 if column_end != '':
998 # For the final key in the range of keys we want all of the
999 # supercolumns whose name starts with end_column_part.
1000 # If the event has includes a pk tag then the format of the
1001 # supercolumn name is <timestamp-part>:<pk-tag>.
1002 # Otherwise it's just the <timestamp-part>. To get both of these
1003 # cases we set the column end value to be the <timestamp-part>
1004 # suffixed with ';' (which has an ordinal value 1 greater than
1005 # ':'. So this will get all of the events with the given column
1006 # end regardless of whether or not they include the pk tag.
1007 column_end = get_events_padded_column_part(column_end) + ';'
1008 slice_predicate = SlicePredicate(slice_range=SliceRange(
1009 start=column_start, finish=column_end, count=1000000))
1010 return slice_predicate
1011
1012
1013def append_log_events_results(event_results, event_list, timestamp_key_part, include_pk_tag=False):
1014 shifted_timestamp_key_part = int(timestamp_key_part) * EVENTS_BUCKET_PERIOD
1015 for item in event_results:
1016 event = {}
1017 super_column_name = item.super_column.name
1018 colon_index = super_column_name.find(":")
1019 if colon_index >= 0:
1020 if include_pk_tag:
1021 pk_tag = super_column_name[colon_index:]
1022 event['pk-tag'] = pk_tag
1023 timestamp_column_part = super_column_name[:colon_index]
1024 else:
1025 timestamp_column_part = super_column_name
1026 timestamp = shifted_timestamp_key_part + int(timestamp_column_part)
1027 event['timestamp'] = timestamp
1028 for column in item.super_column.columns:
1029 event[column.name] = column.value
1030 event_list.append(event)
1031
1032
1033def get_log_event_data(cluster, node_id, start_time, end_time, include_pk_tag=False):
1034 # FIXME: Add some validation of arguments
1035 start_key_part, start_column_part = split_events_timestamp(int(start_time))
1036 end_key_part, end_column_part = split_events_timestamp(int(end_time))
1037
1038 event_list = []
1039 column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME)
1040 for key_part in range(start_key_part, end_key_part+1):
1041 current_start = start_column_part if key_part == start_key_part else ''
1042 current_end = end_column_part if key_part == end_key_part else ''
1043 # FIXME: How big can the count be?
1044 slice_predicate = get_events_slice_predicate(current_start, current_end)
1045 key = construct_log_events_key(cluster, node_id, key_part)
1046 for attempt in (1,2):
1047 try:
1048 results = stats_db_connection.get_client().get_slice(key,
1049 column_parent, slice_predicate, ConsistencyLevel.ONE)
1050 break
1051 except TTransport.TTransportException:
1052 # Only retry once, so if it's the second time through,
1053 # propagate the exception
1054 if attempt == 2:
1055 raise StatsDatabaseConnectionException()
1056 stats_db_connection.reconnect()
1057 except Exception:
1058 raise StatsDatabaseAccessException()
1059 append_log_events_results(results, event_list, key_part, include_pk_tag)
1060
1061 return event_list
1062
1063
1064def put_log_event_data(cluster, log_events_data):
1065 try:
1066 mutation_map = {}
1067 for (node_id, node_events) in log_events_data.iteritems():
1068 for event in node_events:
1069 timestamp = event['timestamp']
1070 pk_tag = event.get('pk-tag')
1071 # If the entry in the put data does not specify a tag, then generate a random one.
1072 # This is so that we can have multiple events with the same timestamp.
1073 # FIXME: Is there something better we can do here?
1074 if not pk_tag:
1075 pk_tag = random.randint(0,10000000000)
1076 timestamp = int(timestamp)
1077 timestamp_key_part, timestamp_column_part = split_events_timestamp(timestamp)
1078 key = construct_log_events_key(cluster, node_id, timestamp_key_part)
1079 key_entry = mutation_map.get(key)
1080 if not key_entry:
1081 mutation_list = []
1082 mutation_map[key] = {EVENTS_COLUMN_FAMILY_NAME: mutation_list}
1083 else:
1084 mutation_list = key_entry[EVENTS_COLUMN_FAMILY_NAME]
1085 supercolumn_name = get_events_padded_column_part(timestamp_column_part)
1086 if pk_tag is not None:
1087 supercolumn_name += (':' + str(pk_tag))
1088 # Build the list of columns in the supercolumn
1089 column_list = []
1090 for (name, value) in event.iteritems():
1091 if name != 'timestamp':
1092 column_list.append(Column(name=name, value=str(value),
1093 timestamp=timestamp*1000))
1094 mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
1095 super_column=SuperColumn(name=supercolumn_name,
1096 columns=column_list)))
1097 mutation_list.append(mutation)
1098 except Exception:
1099 raise StatsInvalidStatsDataException()
1100
1101 call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
1102 mutation_map, ConsistencyLevel.ONE)
1103
1104def delete_log_event_data(cluster, node_id, start_time, end_time):
1105 start_key_part, start_column_part = split_events_timestamp(int(start_time))
1106 end_key_part, end_column_part = split_events_timestamp(int(end_time))
1107 # The Cassandra timestamps are in microseconds, not milliseconds,
1108 # so we convert to microseconds. The Cassandra timestamp is derived
1109 # from the event timestamp (i.e. same time converted to microseconds),
1110 # so we use the end_time + 1, since that's guaranteed to be greater
1111 # than any of the timestamps for the sample points we're deleting.
1112 timestamp = (int(end_time) * 1000) + 1
1113 column_path = ColumnPath(column_family=EVENTS_COLUMN_FAMILY_NAME)
1114 column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME)
1115 for key_part in range(start_key_part, end_key_part+1):
1116 key = construct_log_events_key(cluster, node_id, key_part)
1117 current_start = start_column_part if key_part == start_key_part else ''
1118 current_end = end_column_part if key_part == end_key_part else ''
1119 if current_start == '' and current_end == '':
1120 call_cassandra_with_reconnect(stats_db_connection.get_client().remove,
1121 key, column_path, timestamp, ConsistencyLevel.ONE)
1122 else:
1123 # grrr. Cassandra currently doesn't support doing deletions via a
1124 # slice range (i.e. a column start and end). You need to give it a
1125 # list of columns. So we do a get_slice with the slice range and then
1126 # extract the individual column names from the result of that and
1127 # build up the column list that we can use to delete the column
1128 # using batch_mutate.
1129 slice_predicate = get_events_slice_predicate(current_start, current_end)
1130 get_results = call_cassandra_with_reconnect(
1131 stats_db_connection.get_client().get_slice,
1132 key, column_parent, slice_predicate, ConsistencyLevel.ONE)
1133 column_names = []
1134 for item in get_results:
1135 column_names.append(item.super_column.name)
1136
1137 deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names))
1138 mutation_map = {key: {EVENTS_COLUMN_FAMILY_NAME: [Mutation(deletion=deletion)]}}
1139 call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
1140 mutation_map, ConsistencyLevel.ONE)
1141
1142
1143def get_closest_sample_interval(requested_sample_interval):
1144 for i in range(0, len(DOWNSAMPLE_INTERVALS)):
1145 if DOWNSAMPLE_INTERVALS[i] > requested_sample_interval:
1146 if i == 0:
1147 return requested_sample_interval
1148 downsample_interval = DOWNSAMPLE_INTERVALS[i - 1]
1149 break
1150 else:
1151 downsample_interval = DOWNSAMPLE_INTERVALS[-1]
1152 # Return the closest multiple of the downsampled interval
1153 return downsample_interval * (requested_sample_interval // downsample_interval)
1154
1155
1156def get_closest_window_interval(requested_window):
1157 for i in range(0, len(WINDOW_INTERVALS)):
1158 if WINDOW_INTERVALS[i] > requested_window:
1159 return WINDOW_INTERVALS[i - 1] if i > 0 else 0
1160 return WINDOW_INTERVALS[-1]