srikanth | 116e6e8 | 2014-08-19 07:22:37 -0700 | [diff] [blame] | 1 | # |
| 2 | # Copyright (c) 2013 Big Switch Networks, Inc. |
| 3 | # |
| 4 | # Licensed under the Eclipse Public License, Version 1.0 (the |
| 5 | # "License"); you may not use this file except in compliance with the |
| 6 | # License. You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.eclipse.org/legal/epl-v10.html |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 13 | # implied. See the License for the specific language governing |
| 14 | # permissions and limitations under the License. |
| 15 | # |
| 16 | |
| 17 | from cassandra.ttypes import KsDef, CfDef, InvalidRequestException, TTransport, \ |
| 18 | SlicePredicate, SliceRange, ColumnParent, ConsistencyLevel, ColumnPath, \ |
| 19 | Mutation, Deletion, KeyRange, Column, ColumnOrSuperColumn, SuperColumn |
| 20 | from django.conf import settings |
| 21 | from .utils import CassandraConnection |
| 22 | import random |
| 23 | import datetime |
| 24 | import time |
| 25 | from sdncon.controller.config import get_local_controller_id |
| 26 | |
| 27 | CONTROLLER_STATS_NAME = 'controller' |
| 28 | SWITCH_STATS_NAME = 'switch' |
| 29 | PORT_STATS_NAME = 'port' |
| 30 | TARGET_INDEX_NAME = 'target_index' |
| 31 | STATS_TYPE_INDEX_NAME = 'stats_type_index' |
| 32 | |
| 33 | STATS_COLUMN_FAMILY_NAME_SUFFIX = '_stats' |
| 34 | STATS_TARGET_TYPE_PUT_DATA_SUFFIX = '-stats' |
| 35 | |
| 36 | STATS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds |
| 37 | STATS_PADDED_COLUMN_TIME_LENGTH = len(str(STATS_BUCKET_PERIOD)) |
| 38 | |
| 39 | EVENTS_COLUMN_FAMILY_NAME = 'events' |
| 40 | EVENTS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds |
| 41 | EVENTS_PADDED_COLUMN_TIME_LENGTH = len(str(EVENTS_BUCKET_PERIOD)) |
| 42 | |
| 43 | THIRTY_SECOND_INTERVAL = 30 * 1000 |
| 44 | ONE_MINUTE_INTERVAL = 60 * 1000 |
| 45 | FIVE_MINUTE_INTERVAL = 5 * ONE_MINUTE_INTERVAL |
| 46 | TEN_MINUTE_INTERVAL = 10 * ONE_MINUTE_INTERVAL |
| 47 | ONE_HOUR_INTERVAL = 60 * ONE_MINUTE_INTERVAL |
| 48 | FOUR_HOUR_INTERVAL = 4 * ONE_HOUR_INTERVAL |
| 49 | ONE_DAY_INTERVAL = 24 * ONE_HOUR_INTERVAL |
| 50 | ONE_WEEK_INTERVAL = 7 * ONE_DAY_INTERVAL |
| 51 | FOUR_WEEK_INTERVAL = 4 * ONE_WEEK_INTERVAL |
| 52 | |
| 53 | DOWNSAMPLE_INTERVALS = (ONE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL, |
| 54 | ONE_HOUR_INTERVAL, FOUR_HOUR_INTERVAL, |
| 55 | ONE_DAY_INTERVAL, ONE_WEEK_INTERVAL, |
| 56 | FOUR_WEEK_INTERVAL) |
| 57 | |
| 58 | WINDOW_INTERVALS = (THIRTY_SECOND_INTERVAL, ONE_MINUTE_INTERVAL, |
| 59 | FIVE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL) |
| 60 | |
| 61 | VALUE_DATA_FORMAT = 'value' |
| 62 | RATE_DATA_FORMAT = 'rate' |
| 63 | |
| 64 | |
| 65 | class StatsException(Exception): |
| 66 | pass |
| 67 | |
| 68 | |
| 69 | class StatsInvalidStatsDataException(StatsException): |
| 70 | def __init__(self): |
| 71 | super(StatsInvalidStatsDataException,self).__init__( |
| 72 | 'Error adding stats data with incorrect format') |
| 73 | |
| 74 | |
| 75 | class StatsDatabaseConnectionException(StatsException): |
| 76 | def __init__(self): |
| 77 | super(StatsDatabaseConnectionException,self).__init__( |
| 78 | 'Error connecting to stats database') |
| 79 | |
| 80 | |
| 81 | class StatsDatabaseAccessException(StatsException): |
| 82 | def __init__(self): |
| 83 | super(StatsDatabaseAccessException,self).__init__( |
| 84 | 'Error accessing stats database') |
| 85 | |
| 86 | class StatsNonnumericValueException(StatsException): |
| 87 | def __init__(self, value): |
| 88 | super(StatsNonnumericValueException,self).__init__( |
| 89 | 'Invalid non-numeric stat value for rate or ' |
| 90 | 'average value computation: ' + str(value)) |
| 91 | |
| 92 | |
| 93 | class StatsRateComputationException(StatsException): |
| 94 | def __init__(self): |
| 95 | super(StatsRateComputationException,self).__init__( |
| 96 | 'Error computing rate; not enough raw data') |
| 97 | |
| 98 | |
| 99 | class StatsInvalidDataFormatException(StatsException): |
| 100 | def __init__(self, data_format): |
| 101 | super(StatsInvalidDataFormatException,self).__init__( |
| 102 | 'Invalid data format: ' + str(data_format)) |
| 103 | |
| 104 | |
| 105 | class StatsInvalidStatsTimeRangeException(StatsException): |
| 106 | def __init__(self, start_time, end_time): |
| 107 | super(StatsInvalidStatsTimeRangeException,self).__init__( |
| 108 | 'Invalid stats time range; start = %s; end = %s' % |
| 109 | (str(start_time), str(end_time))) |
| 110 | |
| 111 | |
| 112 | class StatsInvalidStatsTypeException(StatsException): |
| 113 | def __init__(self, stats_type): |
| 114 | super(StatsInvalidStatsTypeException,self).__init__( |
| 115 | 'Invalid stats type; name = %s' % str(stats_type)) |
| 116 | |
| 117 | |
| 118 | class StatsInvalidStatsMetadataException(StatsException): |
| 119 | def __init__(self, file_name): |
| 120 | super(StatsInvalidStatsMetadataException,self).__init__( |
| 121 | 'Invalid stats metadata from file \"%s\"' % str(file_name)) |
| 122 | |
| 123 | |
| 124 | class StatsInternalException(StatsException): |
| 125 | def __init__(self, message): |
| 126 | super(StatsInternalException,self).__init__( |
| 127 | 'Stats internal error: \"%s\"' % str(message)) |
| 128 | |
| 129 | class StatsCreateColumnFamilyException(StatsException): |
| 130 | def __init__(self, name): |
| 131 | super(StatsCreateColumnFamilyException,self).__init__( |
| 132 | 'Error creating column family; name = ' % name) |
| 133 | |
| 134 | # The following code is a hack to get the stats code to use a freshly |
| 135 | # created test keyspace when we're running the unit tests. I'm guessing |
| 136 | # there must be some way to detect if we're running under the Django |
| 137 | # (or Python) unit test framework, but I couldn't find any info about |
| 138 | # how to do that. So for now, we just provide this function that the |
| 139 | # unit tests must call before they make any stats call to get the stats |
| 140 | # code to use the test keyspace instead of the normal production |
| 141 | # keyspace. Bother. |
| 142 | |
| 143 | use_test_keyspace = False |
| 144 | |
| 145 | def set_use_test_keyspace(): |
| 146 | global use_test_keyspace |
| 147 | use_test_keyspace = True |
| 148 | |
| 149 | |
| 150 | stats_db_connection = None |
| 151 | |
| 152 | # FIXME: Should ideally create the column families on demand as the data |
| 153 | # is added, so the different target types don't need to be predefined here. |
| 154 | # Probably not a big problem for right now, though. |
| 155 | COLUMN_FAMILY_INFO_LIST = ( |
| 156 | {'name': TARGET_INDEX_NAME, |
| 157 | 'column_type': 'Super', |
| 158 | 'comparator_type': 'UTF8Type', |
| 159 | 'subcomparator_type': 'UTF8Type'}, |
| 160 | {'name': STATS_TYPE_INDEX_NAME, |
| 161 | 'column_type': 'Super', |
| 162 | 'comparator_type': 'UTF8Type', |
| 163 | 'subcomparator_type': 'UTF8Type'}, |
| 164 | {'name': CONTROLLER_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX, |
| 165 | 'comparator_type': 'UTF8Type'}, |
| 166 | {'name': SWITCH_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX, |
| 167 | 'comparator_type': 'UTF8Type'}, |
| 168 | {'name': PORT_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX, |
| 169 | 'comparator_type': 'UTF8Type'}, |
| 170 | {'name': EVENTS_COLUMN_FAMILY_NAME, |
| 171 | 'column_type': 'Super', |
| 172 | 'comparator_type': 'UTF8Type', |
| 173 | 'subcomparator_type': 'UTF8Type'}, |
| 174 | ) |
| 175 | |
| 176 | |
| 177 | def init_stats_db_connection(host, port, keyspace, user, password, |
| 178 | replication_factor, column_family_def_default_settings): |
| 179 | global stats_db_connection |
| 180 | if not stats_db_connection: |
| 181 | if use_test_keyspace: |
| 182 | keyspace = "test_" + keyspace |
| 183 | |
| 184 | try: |
| 185 | stats_db_connection = CassandraConnection(host, port, keyspace, user, password) |
| 186 | stats_db_connection.connect() |
| 187 | except Exception: |
| 188 | stats_db_connection = None |
| 189 | raise StatsException("Error connecting to Cassandra daemon") |
| 190 | |
| 191 | if use_test_keyspace: |
| 192 | try: |
| 193 | stats_db_connection.get_client().system_drop_keyspace(keyspace) |
| 194 | except Exception: |
| 195 | pass |
| 196 | |
| 197 | try: |
| 198 | stats_db_connection.set_keyspace() |
| 199 | create_keyspace = False |
| 200 | except Exception: |
| 201 | create_keyspace = True |
| 202 | |
| 203 | if create_keyspace: |
| 204 | keyspace_def = KsDef(name=keyspace, |
| 205 | strategy_class='org.apache.cassandra.locator.SimpleStrategy', |
| 206 | replication_factor=replication_factor, |
| 207 | cf_defs=[]) |
| 208 | try: |
| 209 | stats_db_connection.get_client().system_add_keyspace(keyspace_def) |
| 210 | stats_db_connection.set_keyspace() |
| 211 | except Exception, _e: |
| 212 | stats_db_connection = None |
| 213 | raise StatsException("Error creating stats keyspace") |
| 214 | |
| 215 | for column_family_info in COLUMN_FAMILY_INFO_LIST: |
| 216 | try: |
| 217 | column_family_def_settings = column_family_def_default_settings.copy() |
| 218 | column_family_def_settings.update(column_family_info) |
| 219 | column_family_def_settings['keyspace'] = keyspace |
| 220 | # pylint: disable=W0142 |
| 221 | stats_db_connection.get_client().system_add_column_family( |
| 222 | CfDef(**column_family_def_settings)) |
| 223 | except InvalidRequestException, _e: |
| 224 | # Assume this is because the column family already exists. |
| 225 | # FIXME. Could check exception message for specific string |
| 226 | pass |
| 227 | except Exception, _e: |
| 228 | stats_db_connection = None |
| 229 | raise StatsCreateColumnFamilyException(column_family_info.get('name')) |
| 230 | |
| 231 | |
| 232 | def get_stats_db_connection(): |
| 233 | return stats_db_connection |
| 234 | |
| 235 | |
| 236 | # The following function is mainly intended to be used by the unit tests. It lets |
| 237 | # you clear out all of the data from the database. Note that since the stats DB |
| 238 | # is not managed by the normal Django DB mechanism you don't get the automatic |
| 239 | # DB flushing from the Django TestCase code, so it has to be done explicitly. |
| 240 | # There's a StatsTestCase subclass of TestCase in the stats unit tests that |
| 241 | # implements the tearDown method to call flush_stats_db after each test. |
| 242 | def flush_stats_db(): |
| 243 | if stats_db_connection is not None: |
| 244 | for column_family_info in COLUMN_FAMILY_INFO_LIST: |
| 245 | stats_db_connection.get_client().truncate(column_family_info['name']) |
| 246 | |
| 247 | def call_cassandra_with_reconnect(fn, *args, **kwargs): |
| 248 | try: |
| 249 | try: |
| 250 | results = fn(*args, **kwargs) |
| 251 | except TTransport.TTransportException: |
| 252 | stats_db_connection.reconnect() |
| 253 | results = fn(*args, **kwargs) |
| 254 | except TTransport.TTransportException, _e: |
| 255 | raise StatsDatabaseConnectionException() |
| 256 | except Exception, _e: |
| 257 | raise StatsDatabaseAccessException() |
| 258 | |
| 259 | return results |
| 260 | |
| 261 | |
| 262 | def get_stats_padded_column_part(column_part): |
| 263 | """ |
| 264 | For the columns to be sorted correctly by time we need to pad with |
| 265 | leading zeroes up to the maximum range of the bucket |
| 266 | """ |
| 267 | column_part = str(column_part) |
| 268 | leading_zeroes = ('0'*(STATS_PADDED_COLUMN_TIME_LENGTH-len(column_part))) |
| 269 | column_part = leading_zeroes + column_part |
| 270 | return column_part |
| 271 | |
| 272 | |
| 273 | def split_stats_timestamp(timestamp): |
| 274 | key_part = timestamp / STATS_BUCKET_PERIOD |
| 275 | column_part = timestamp % STATS_BUCKET_PERIOD |
| 276 | return (key_part, column_part) |
| 277 | |
| 278 | |
| 279 | def construct_stats_key(cluster, target_id, stats_type, timestamp_key_part): |
| 280 | """ |
| 281 | Constructs the keys for the controller or switch stats. |
| 282 | For the controller stats the target_id is the controller node id. |
| 283 | For switch stats the target_id is the dpid of the switch. |
| 284 | """ |
| 285 | return cluster + '|' + target_id + '|' + stats_type + '|' + str(timestamp_key_part) |
| 286 | |
| 287 | |
| 288 | def append_stats_results(get_results, values, timestamp_key_part): |
| 289 | shifted_timestamp_key_part = int(timestamp_key_part) * STATS_BUCKET_PERIOD |
| 290 | for item in get_results: |
| 291 | timestamp_column_part = int(item.column.name) |
| 292 | value = item.column.value |
| 293 | timestamp = shifted_timestamp_key_part + timestamp_column_part |
| 294 | values.append((timestamp, value)) |
| 295 | |
| 296 | |
| 297 | def get_stats_slice_predicate(column_start, column_end): |
| 298 | if column_start != '': |
| 299 | column_start = get_stats_padded_column_part(column_start) |
| 300 | if column_end != '': |
| 301 | column_end = get_stats_padded_column_part(column_end) |
| 302 | slice_predicate = SlicePredicate(slice_range=SliceRange( |
| 303 | start=column_start, finish=column_end, count=1000000)) |
| 304 | return slice_predicate |
| 305 | |
| 306 | |
| 307 | def check_time_range(start_time, end_time): |
| 308 | if int(end_time) < int(start_time): |
| 309 | raise StatsInvalidStatsTimeRangeException(start_time, end_time) |
| 310 | |
| 311 | |
| 312 | def check_valid_data_format(data_format): |
| 313 | if data_format != VALUE_DATA_FORMAT and data_format != RATE_DATA_FORMAT: |
| 314 | raise StatsInvalidDataFormatException(data_format) |
| 315 | |
| 316 | |
| 317 | def get_window_range(raw_stats_values, index, window): |
| 318 | |
| 319 | if window == 0: |
| 320 | return (index, index) |
| 321 | |
| 322 | # Get start index |
| 323 | timestamp = raw_stats_values[index][0] |
| 324 | start_timestamp = timestamp - (window / 2) |
| 325 | end_timestamp = timestamp + (window / 2) |
| 326 | |
| 327 | start_index = index |
| 328 | while start_index > 0: |
| 329 | next_timestamp = raw_stats_values[start_index - 1][0] |
| 330 | if next_timestamp < start_timestamp: |
| 331 | break |
| 332 | start_index -= 1 |
| 333 | |
| 334 | end_index = index |
| 335 | while end_index < len(raw_stats_values) - 1: |
| 336 | next_timestamp = raw_stats_values[end_index + 1][0] |
| 337 | if next_timestamp > end_timestamp: |
| 338 | break |
| 339 | end_index += 1 |
| 340 | |
| 341 | return (start_index, end_index) |
| 342 | |
| 343 | |
| 344 | def convert_stat_string_to_value(stat_string): |
| 345 | try: |
| 346 | stat_value = int(stat_string) |
| 347 | except ValueError: |
| 348 | try: |
| 349 | stat_value = float(stat_string) |
| 350 | except ValueError: |
| 351 | stat_value = stat_string |
| 352 | return stat_value |
| 353 | |
| 354 | |
| 355 | def get_rate_over_stats_values(stats_values): |
| 356 | |
| 357 | if len(stats_values) < 2: |
| 358 | return None |
| 359 | |
| 360 | start_stat = stats_values[0] |
| 361 | end_stat = stats_values[-1] |
| 362 | |
| 363 | timestamp_delta = end_stat[0] - start_stat[0] |
| 364 | # NOTE: In computing the value_delta here it's safe to assume floats |
| 365 | # rather than calling convert_stat_string_to_value because we're going |
| 366 | # to be converting to float anyway when we do the rate calculation later. |
| 367 | # So there's no point in trying to differentiate between int and float |
| 368 | # and rate doesn't make sense for any other type of stat data (e.g. string) |
| 369 | value_delta = float(end_stat[1]) - float(start_stat[1]) |
| 370 | if timestamp_delta == 0: |
| 371 | rate = float('inf' if value_delta > 0 else '-inf') |
| 372 | else: |
| 373 | rate = value_delta / timestamp_delta |
| 374 | |
| 375 | return rate |
| 376 | |
| 377 | |
| 378 | def get_rate_over_window(raw_stats_values, index, window): |
| 379 | if len(raw_stats_values) < 2: |
| 380 | return None |
| 381 | |
| 382 | if window == 0: |
| 383 | if index == 0: |
| 384 | start_index = 0 |
| 385 | end_index = 1 |
| 386 | else: |
| 387 | start_index = index - 1 |
| 388 | end_index = index |
| 389 | else: |
| 390 | start_index, end_index = get_window_range(raw_stats_values, index, window) |
| 391 | |
| 392 | return get_rate_over_stats_values(raw_stats_values[start_index:end_index + 1]) |
| 393 | |
| 394 | |
| 395 | def get_average_over_stats_values(stats_values): |
| 396 | |
| 397 | total = 0 |
| 398 | count = 0 |
| 399 | for stat_value in stats_values: |
| 400 | # FIXME: Should we just always convert to float here? |
| 401 | # This would give a more accurate result for the average calculation |
| 402 | # but would mean that the data type is different for a |
| 403 | # zero vs. non-zero window size. |
| 404 | value = convert_stat_string_to_value(stat_value[1]) |
| 405 | if type(value) not in (int,float): |
| 406 | raise StatsNonnumericValueException(value) |
| 407 | total += value |
| 408 | count += 1 |
| 409 | |
| 410 | return (total / count) if count > 0 else None |
| 411 | |
| 412 | |
| 413 | def get_average_value_over_window(raw_stats_values, index, window): |
| 414 | start_index, end_index = get_window_range(raw_stats_values, index, window) |
| 415 | stats_values = raw_stats_values[start_index:end_index + 1] |
| 416 | return get_average_over_stats_values(stats_values) |
| 417 | |
| 418 | |
| 419 | def reverse_stats_data_generator(cluster, target_type, target_id, stats_type, |
| 420 | start_time=None, end_time=None, |
| 421 | chunk_interval=3600000): |
| 422 | if start_time is None: |
| 423 | start_time = int(time.time() * 1000) |
| 424 | if end_time is None: |
| 425 | # By default, don't go back past 1/1/2011. This was before we had stats support |
| 426 | # in the controller, so we shouldn't have any data earlier than that (except if |
| 427 | # the clock on the controller was set incorrectly). |
| 428 | end_time = int(time.mktime(datetime.datetime(2011,1,1).timetuple()) * 1000) |
| 429 | end_key_part, _end_column_part = split_stats_timestamp(end_time) |
| 430 | key_part, column_part = split_stats_timestamp(start_time) |
| 431 | column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX |
| 432 | column_parent = ColumnParent(column_family) |
| 433 | # FIXME: Should add support for chunk_interval to be either iterable or a |
| 434 | # list/tuple to give a sequence of chunk intervals to use. The last available |
| 435 | # chunk interval from the list/tuple/iterator would then be used for any |
| 436 | # subsequent cassandra calls |
| 437 | #chunk_interval_iter = (chunk_interval if isinstance(chunk_interval, list) or |
| 438 | # isinstance(chunk_interval, tuple) else [chunk_interval]) |
| 439 | while key_part >= 0: |
| 440 | key = construct_stats_key(cluster, target_id, stats_type, key_part) |
| 441 | |
| 442 | while True: |
| 443 | column_start = column_part - chunk_interval |
| 444 | if column_start < 0: |
| 445 | column_start = 0 |
| 446 | slice_predicate = get_stats_slice_predicate(column_start, column_part) |
| 447 | for attempt in (1,2): |
| 448 | try: |
| 449 | get_results = stats_db_connection.get_client().get_slice(key, |
| 450 | column_parent, slice_predicate, ConsistencyLevel.ONE) |
| 451 | for item in reversed(get_results): |
| 452 | timestamp = (key_part * STATS_BUCKET_PERIOD) + int(item.column.name) |
| 453 | value = item.column.value |
| 454 | yield (timestamp, value) |
| 455 | break |
| 456 | except TTransport.TTransportException: |
| 457 | # Only retry once, so if it's the second time through, |
| 458 | # propagate the exception |
| 459 | if attempt == 2: |
| 460 | raise StatsDatabaseConnectionException() |
| 461 | stats_db_connection.reconnect() |
| 462 | except Exception: |
| 463 | raise StatsDatabaseAccessException() |
| 464 | |
| 465 | column_part = column_start |
| 466 | if column_part == 0: |
| 467 | break |
| 468 | |
| 469 | if key_part == end_key_part: |
| 470 | break |
| 471 | key_part -= 1 |
| 472 | column_part = STATS_BUCKET_PERIOD - 1 |
| 473 | |
| 474 | |
| 475 | def get_latest_stat_data(cluster, target_type, target_id, stats_type, |
| 476 | window=0, data_format=VALUE_DATA_FORMAT): |
| 477 | |
| 478 | check_valid_data_format(data_format) |
| 479 | |
| 480 | minimum_data_points = 2 if data_format == RATE_DATA_FORMAT else 1 |
| 481 | stats_data_window = [] |
| 482 | latest_stat_timestamp = None |
| 483 | |
| 484 | start_time = int(time.time() * 1000) |
| 485 | # Limit how far back we'll look for the latest stat value. |
| 486 | # 86400000 is 1 day in ms |
| 487 | end_time = start_time - 86400000 |
| 488 | for stat_data_point in reverse_stats_data_generator(cluster, |
| 489 | target_type, target_id, stats_type, start_time, end_time): |
| 490 | current_stat_timestamp = stat_data_point[0] |
| 491 | if latest_stat_timestamp is None: |
| 492 | latest_stat_timestamp = current_stat_timestamp |
| 493 | |
| 494 | # NOTE: For stats operations we treat the window for the rate or |
| 495 | # average calculation to be centered around the current timestamp. |
| 496 | # For the latest stat case there is no data after the current point. |
| 497 | # We could extend the window back further so that the current timestamp |
| 498 | # is the end of the window range instead of the middle, but then that |
| 499 | # would be inconsistent with the other cases, so instead we just go |
| 500 | # back to half the window size. I haven't been able to convince myself |
| 501 | # strongly one way or the other which is better (or how much it matters) |
| 502 | outside_window = (latest_stat_timestamp - current_stat_timestamp) > (window / 2) |
| 503 | if len(stats_data_window) >= minimum_data_points and outside_window: |
| 504 | break |
| 505 | |
| 506 | stats_data_window.insert(0, stat_data_point) |
| 507 | |
| 508 | if (window == 0) and (len(stats_data_window) >= minimum_data_points): |
| 509 | break |
| 510 | |
| 511 | stat_data_point = None |
| 512 | |
| 513 | if latest_stat_timestamp is not None: |
| 514 | if data_format == VALUE_DATA_FORMAT: |
| 515 | value = get_average_over_stats_values(stats_data_window) |
| 516 | else: |
| 517 | assert data_format == RATE_DATA_FORMAT, "Invalid data format" |
| 518 | value = get_rate_over_stats_values(stats_data_window) |
| 519 | if value is not None: |
| 520 | stat_data_point = (latest_stat_timestamp, value) |
| 521 | |
| 522 | return stat_data_point |
| 523 | |
| 524 | |
| 525 | def get_stats_data(cluster, target_type, target_id, stats_type, |
| 526 | start_time, end_time, sample_interval=0, window=0, |
| 527 | data_format=VALUE_DATA_FORMAT, limit=None): |
| 528 | |
| 529 | check_time_range(start_time, end_time) |
| 530 | check_valid_data_format(data_format) |
| 531 | # FIXME: Add validation of other arguments |
| 532 | |
| 533 | start_key_part, start_column_part = split_stats_timestamp(int(start_time)) |
| 534 | end_key_part, end_column_part = split_stats_timestamp(int(end_time)) |
| 535 | |
| 536 | raw_stats_values = [] |
| 537 | column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX |
| 538 | column_parent = ColumnParent(column_family) |
| 539 | |
| 540 | for key_part in range(start_key_part, end_key_part+1): |
| 541 | current_start = start_column_part if key_part == start_key_part else '' |
| 542 | current_end = end_column_part if key_part == end_key_part else '' |
| 543 | # FIXME: How big can the count be? |
| 544 | slice_predicate = get_stats_slice_predicate(current_start, current_end) |
| 545 | key = construct_stats_key(cluster, target_id, stats_type, key_part) |
| 546 | for attempt in (1,2): |
| 547 | try: |
| 548 | get_results = stats_db_connection.get_client().get_slice(key, |
| 549 | column_parent, slice_predicate, ConsistencyLevel.ONE) |
| 550 | break |
| 551 | except TTransport.TTransportException: |
| 552 | # Only retry once, so if it's the second time through, |
| 553 | # propagate the exception |
| 554 | if attempt == 2: |
| 555 | raise StatsDatabaseConnectionException() |
| 556 | stats_db_connection.reconnect() |
| 557 | except Exception: |
| 558 | raise StatsDatabaseAccessException() |
| 559 | |
| 560 | append_stats_results(get_results, raw_stats_values, key_part) |
| 561 | |
| 562 | # FIXME: This logic to handle the limit argument isn't complete. |
| 563 | # It doesn't account for a non-zero window or dpwnsampling. |
| 564 | if (limit != None and sample_interval == 0 and window == 0 and |
| 565 | len(raw_stats_values) > limit): |
| 566 | raw_stats_values = raw_stats_values[:limit] |
| 567 | break |
| 568 | |
| 569 | stats_values = [] |
| 570 | last_downsample_index = None |
| 571 | for i in range(0, len(raw_stats_values)): |
| 572 | # Handle downsampling |
| 573 | if sample_interval != 0: |
| 574 | downsample_index = raw_stats_values[i][0] / sample_interval |
| 575 | if downsample_index == last_downsample_index: |
| 576 | continue |
| 577 | last_downsample_index = downsample_index |
| 578 | |
| 579 | # Get the value based for the specified data format |
| 580 | if data_format == VALUE_DATA_FORMAT: |
| 581 | if sample_interval == 0: |
| 582 | value = convert_stat_string_to_value(raw_stats_values[i][1]) |
| 583 | else: |
| 584 | value = get_average_value_over_window(raw_stats_values, i, window) |
| 585 | else: |
| 586 | assert data_format == RATE_DATA_FORMAT, "Invalid data format" |
| 587 | value = get_rate_over_window(raw_stats_values, i, window) |
| 588 | |
| 589 | if value is not None: |
| 590 | stats_values.append((raw_stats_values[i][0], value)) |
| 591 | |
| 592 | return stats_values |
| 593 | |
| 594 | |
| 595 | def delete_stats_data(cluster, target_type, target_id, stats_type, |
| 596 | start_time, end_time): |
| 597 | |
| 598 | check_time_range(start_time, end_time) |
| 599 | # FIXME: Add validation of other arguments |
| 600 | |
| 601 | start_key_part, start_column_part = split_stats_timestamp(int(start_time)) |
| 602 | end_key_part, end_column_part = split_stats_timestamp(int(end_time)) |
| 603 | |
| 604 | column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX |
| 605 | column_parent = ColumnParent(column_family) |
| 606 | # The Cassandra timestamps are in microseconds, not milliseconds, |
| 607 | # so we convert to microseconds. The Cassandra timestamp is derived |
| 608 | # from the stat timestamp (i.e. same time converted to microseconds), |
| 609 | # so we use the end_time + 1, since that's guaranteed to be greater |
| 610 | # than any of the timestamps for the sample points we're deleting. |
| 611 | timestamp = (int(end_time) * 1000) + 1 |
| 612 | for key_part in range(start_key_part, end_key_part+1): |
| 613 | key = construct_stats_key(cluster, target_id, stats_type, key_part) |
| 614 | current_start = start_column_part if key_part == start_key_part else '' |
| 615 | current_end = end_column_part if key_part == end_key_part else '' |
| 616 | if current_start == '' and current_end == '': |
| 617 | call_cassandra_with_reconnect(stats_db_connection.get_client().remove, |
| 618 | key, ColumnPath(column_family=column_family), timestamp, |
| 619 | ConsistencyLevel.ONE) |
| 620 | else: |
| 621 | # grrr. Cassandra currently doesn't support doing deletions via a |
| 622 | # slice range (i.e. a column start and end). You need to give it a |
| 623 | # list of columns. So we do a get_slice with the slice range and then |
| 624 | # extract the individual column names from the result of that and |
| 625 | # build up the column list that we can use to delete the column |
| 626 | # using batch_mutate. |
| 627 | slice_predicate = get_stats_slice_predicate(current_start, current_end) |
| 628 | get_results = call_cassandra_with_reconnect( |
| 629 | stats_db_connection.get_client().get_slice, |
| 630 | key, column_parent, slice_predicate, ConsistencyLevel.ONE) |
| 631 | column_names = [] |
| 632 | for item in get_results: |
| 633 | column_names.append(item.column.name) |
| 634 | |
| 635 | deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names)) |
| 636 | mutation_map = {key: {column_family: [Mutation(deletion=deletion)]}} |
| 637 | call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate, |
| 638 | mutation_map, ConsistencyLevel.ONE) |
| 639 | |
| 640 | |
| 641 | STATS_METADATA_VARIABLE_NAME = 'STATS_METADATA' |
| 642 | |
| 643 | stats_metadata = None |
| 644 | |
| 645 | def init_stats_metadata(_cluster): |
| 646 | """ |
| 647 | Initialize the dictionary of stats metadata. Currently this is initialized |
| 648 | from a directory of metadata files that contain the metadata. Currently, |
| 649 | there is no differentiation in the stats types that are supported across |
| 650 | clusters, so we ignore the cluster argument and we just maintain a |
| 651 | global map of stat type metadata. |
| 652 | """ |
| 653 | global stats_metadata |
| 654 | if not stats_metadata: |
| 655 | stats_metadata = {} |
| 656 | for module_name in settings.STATS_METADATA_MODULES: |
| 657 | metadata_module = __import__(module_name, |
| 658 | fromlist=[STATS_METADATA_VARIABLE_NAME]) |
| 659 | if not metadata_module: |
| 660 | # FIXME: log error |
| 661 | continue |
| 662 | |
| 663 | if STATS_METADATA_VARIABLE_NAME not in dir(metadata_module): |
| 664 | # FIXME: log error |
| 665 | continue |
| 666 | |
| 667 | metadata_list = getattr(metadata_module, STATS_METADATA_VARIABLE_NAME) |
| 668 | |
| 669 | if type(metadata_list) is dict: |
| 670 | metadata_list = [metadata_list] |
| 671 | |
| 672 | if type(metadata_list) is not list and type(metadata_list) is not tuple: |
| 673 | raise StatsInvalidStatsMetadataException(module_name) |
| 674 | |
| 675 | for metadata in metadata_list: |
| 676 | if type(metadata) is not dict: |
| 677 | raise StatsInvalidStatsMetadataException(module_name) |
| 678 | |
| 679 | name = metadata.get('name') |
| 680 | if not name: |
| 681 | raise StatsInvalidStatsMetadataException(module_name) |
| 682 | name = str(name) |
| 683 | |
| 684 | # Auto-set the verbose_name to the name if it's not set explicitly |
| 685 | verbose_name = metadata.get('verbose_name') |
| 686 | if not verbose_name: |
| 687 | metadata['verbose_name'] = name |
| 688 | |
| 689 | # FIXME: Validate other contents of metadata. |
| 690 | # e.g. flag name conflicts between files. |
| 691 | |
| 692 | stats_metadata[name] = metadata |
| 693 | |
| 694 | def get_stats_metadata(cluster, stats_type=None): |
| 695 | init_stats_metadata(cluster) |
| 696 | # If no stat_type is specified return the entire dictionary of stat types |
| 697 | metadata = stats_metadata.get(stats_type) if stats_type else stats_metadata |
| 698 | if metadata is None: |
| 699 | raise StatsInvalidStatsTypeException(stats_type) |
| 700 | return metadata |
| 701 | |
| 702 | |
| 703 | STATS_INDEX_ATTRIBUTE_TYPES = { |
| 704 | 'last-updated': int |
| 705 | } |
| 706 | def stats_type_slice_to_index_data(stats_type_slice): |
| 707 | index_data = {} |
| 708 | for super_column in stats_type_slice: |
| 709 | name = super_column.super_column.name |
| 710 | column_list = super_column.super_column.columns |
| 711 | if name == 'base': |
| 712 | insert_map = index_data |
| 713 | elif name.startswith('param:'): |
| 714 | colon_index = name.find(':') |
| 715 | parameter_name = name[colon_index+1:] |
| 716 | parameter_map = index_data.get('parameters') |
| 717 | if not parameter_map: |
| 718 | parameter_map = {} |
| 719 | index_data['parameters'] = parameter_map |
| 720 | insert_map = {} |
| 721 | parameter_map[parameter_name] = insert_map |
| 722 | else: |
| 723 | raise StatsInternalException('Invalid stats type index name: ' + str(name)) |
| 724 | |
| 725 | for column in column_list: |
| 726 | value = column.value |
| 727 | attribute_type = STATS_INDEX_ATTRIBUTE_TYPES.get(column.name) |
| 728 | if attribute_type is not None: |
| 729 | value = attribute_type(value) |
| 730 | insert_map[column.name] = value |
| 731 | |
| 732 | return index_data |
| 733 | |
| 734 | |
| 735 | def get_stats_type_index(cluster, target_type, target_id, stats_type=None): |
| 736 | column_parent = ColumnParent(STATS_TYPE_INDEX_NAME) |
| 737 | slice_predicate = SlicePredicate(slice_range=SliceRange( |
| 738 | start='', finish='', count=1000000)) |
| 739 | key_prefix = cluster + ':' + target_type + ':' + target_id |
| 740 | if stats_type is None: |
| 741 | key_range = KeyRange(start_key=key_prefix+':', end_key=key_prefix+';', count=100000) |
| 742 | key_slice_list = call_cassandra_with_reconnect( |
| 743 | stats_db_connection.get_client().get_range_slices, |
| 744 | column_parent, slice_predicate, key_range, ConsistencyLevel.ONE) |
| 745 | stats_index_data = {} |
| 746 | for key_slice in key_slice_list: |
| 747 | key = key_slice.key |
| 748 | colon_index = key.rfind(':') |
| 749 | if colon_index < 0: |
| 750 | raise StatsInternalException('Invalid stats type index key: ' + str(key)) |
| 751 | stats_type = key[colon_index+1:] |
| 752 | stats_index_data[stats_type] = stats_type_slice_to_index_data(key_slice.columns) |
| 753 | else: |
| 754 | key = key_prefix + ':' + stats_type |
| 755 | stats_type_slice = call_cassandra_with_reconnect( |
| 756 | stats_db_connection.get_client().get_slice, key, column_parent, |
| 757 | slice_predicate, ConsistencyLevel.ONE) |
| 758 | stats_index_data = stats_type_slice_to_index_data(stats_type_slice) |
| 759 | |
| 760 | return stats_index_data |
| 761 | |
| 762 | |
| 763 | def get_stats_target_types(cluster): |
| 764 | column_parent = ColumnParent(TARGET_INDEX_NAME) |
| 765 | slice_predicate = SlicePredicate(column_names=[]) |
| 766 | key_range = KeyRange(start_key=cluster+':', end_key=cluster+';', count=100000) |
| 767 | key_slice_list = call_cassandra_with_reconnect( |
| 768 | stats_db_connection.get_client().get_range_slices, |
| 769 | column_parent, slice_predicate, key_range, ConsistencyLevel.ONE) |
| 770 | |
| 771 | target_types = {} |
| 772 | for key_slice in key_slice_list: |
| 773 | target_type = key_slice.key[len(cluster)+1:] |
| 774 | |
| 775 | target_types[target_type] = {} |
| 776 | |
| 777 | return target_types |
| 778 | |
| 779 | |
| 780 | STATS_TARGET_ATTRIBUTE_TYPES = { |
| 781 | 'last-updated': int |
| 782 | } |
| 783 | |
| 784 | def get_stats_targets(cluster, target_type): |
| 785 | key = cluster + ':' + target_type |
| 786 | column_parent = ColumnParent(TARGET_INDEX_NAME) |
| 787 | slice_predicate = SlicePredicate(slice_range=SliceRange( |
| 788 | start='', finish='', count=1000000)) |
| 789 | super_column_list = call_cassandra_with_reconnect( |
| 790 | stats_db_connection.get_client().get_slice, key, column_parent, |
| 791 | slice_predicate, ConsistencyLevel.ONE) |
| 792 | target_list = {} |
| 793 | for item in super_column_list: |
| 794 | target = {} |
| 795 | for column in item.super_column.columns: |
| 796 | value = column.value |
| 797 | attribute_type = STATS_TARGET_ATTRIBUTE_TYPES.get(column.name) |
| 798 | if attribute_type is not None: |
| 799 | value = attribute_type(value) |
| 800 | target[column.name] = value |
| 801 | target_list[item.super_column.name] = target |
| 802 | |
| 803 | return target_list |
| 804 | |
| 805 | |
| 806 | # FIXME: Should update the code below to use these constants |
| 807 | # instead of string literals |
| 808 | LAST_UPDATED_ATTRIBUTE_NAME = 'last-updated' |
| 809 | CONTROLLER_ATTRIBUTE_NAME = 'controller' |
| 810 | BASE_SUPER_COLUMN_NAME = 'base' |
| 811 | PARAMETERS_SUPER_COLUMN_NAME = 'parameters' |
| 812 | PARAM_SUPER_COLUMN_NAME_PREFIX = 'param:' |
| 813 | |
| 814 | def append_attributes_to_mutation_list(attributes, supercolumn_name, mutation_list): |
| 815 | column_list = [] |
| 816 | for name, info in attributes.iteritems(): |
| 817 | timestamp, value = info |
| 818 | column = Column(name=name, value=str(value), timestamp=timestamp*1000) |
| 819 | column_list.append(column) |
| 820 | mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn( |
| 821 | super_column=SuperColumn(name=supercolumn_name, columns=column_list))) |
| 822 | mutation_list.append(mutation) |
| 823 | |
| 824 | |
| 825 | def add_stat_type_index_info_to_mutation_map(cluster, target_type, |
| 826 | stats_type_index, mutation_map): |
| 827 | for key, stats_type_info in stats_type_index.iteritems(): |
| 828 | separator_index = key.find(':') |
| 829 | assert separator_index >= 0 |
| 830 | base_stat_type_name = key[:separator_index] |
| 831 | target_id = key[separator_index + 1:] |
| 832 | stats_type_base_attributes, stats_type_params = stats_type_info |
| 833 | mutation_list = [] |
| 834 | append_attributes_to_mutation_list(stats_type_base_attributes, |
| 835 | 'base', mutation_list) |
| 836 | for name, attributes in stats_type_params.iteritems(): |
| 837 | append_attributes_to_mutation_list(attributes, |
| 838 | 'param:' + name, mutation_list) |
| 839 | mutation_key = cluster + ':' + target_type + ':' + target_id + ':' + base_stat_type_name |
| 840 | mutation_map[mutation_key] = {STATS_TYPE_INDEX_NAME: mutation_list} |
| 841 | |
| 842 | |
| 843 | def add_target_id_list_to_mutation_map(cluster, target_type, |
| 844 | target_id_list, mutation_map): |
| 845 | mutation_list = [] |
| 846 | for target_id, attributes in target_id_list: |
| 847 | append_attributes_to_mutation_list(attributes, target_id, mutation_list) |
| 848 | key = cluster + ':' + target_type |
| 849 | mutation_map[key] = {TARGET_INDEX_NAME: mutation_list} |
| 850 | |
| 851 | |
| 852 | def _put_stats_data(cluster, target_type, stats_data): |
| 853 | try: |
| 854 | controller_id = get_local_controller_id() |
| 855 | mutation_map = {} |
| 856 | target_id_list = [] |
| 857 | stats_type_index = {} |
| 858 | column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX |
| 859 | for (target_id, target_id_stats) in stats_data.iteritems(): |
| 860 | # Map 'localhost' controller to the actual ID for the local controller |
| 861 | # FIXME: Eventually we should fix up the other components (e.g. statd) |
| 862 | # that invoke this REST API to not use localhost and instead use the |
| 863 | # REST API to obtain the real ID for the local controller, but for now |
| 864 | # this works to ensure we're not using localhost in any of the data we |
| 865 | # store in the DB (unless, of course, the uuid version of the controller |
| 866 | # ID hasn't been written to the boot-config file, in which case it will |
| 867 | # default to the old localhost value). |
| 868 | if target_type == 'controller' and target_id == 'localhost': |
| 869 | target_id = controller_id |
| 870 | latest_id_timestamp = None |
| 871 | for (stats_type, stats_data_array) in target_id_stats.iteritems(): |
| 872 | # Check if it's a parameterized type and extract the base |
| 873 | # stat type and parameter name. |
| 874 | parameter_separator = stats_type.find('__') |
| 875 | if parameter_separator >= 0: |
| 876 | stats_type_base = stats_type[:parameter_separator] |
| 877 | stats_type_parameter = stats_type[parameter_separator+2:] |
| 878 | else: |
| 879 | stats_type_base = stats_type |
| 880 | stats_type_parameter = None |
| 881 | |
| 882 | latest_stat_type_timestamp = None |
| 883 | |
| 884 | # Add the stats values to the mutation map |
| 885 | for stats_value in stats_data_array: |
| 886 | timestamp = int(stats_value['timestamp']) |
| 887 | if latest_stat_type_timestamp is None or timestamp > latest_stat_type_timestamp: |
| 888 | latest_stat_type_timestamp = timestamp |
| 889 | if latest_id_timestamp is None or timestamp > latest_id_timestamp: |
| 890 | latest_id_timestamp = timestamp |
| 891 | value = stats_value['value'] |
| 892 | timestamp_key_part, timestamp_column_part = split_stats_timestamp(timestamp) |
| 893 | key = construct_stats_key(cluster, target_id, stats_type, timestamp_key_part) |
| 894 | key_entry = mutation_map.get(key) |
| 895 | if not key_entry: |
| 896 | mutation_list = [] |
| 897 | mutation_map[key] = {column_family: mutation_list} |
| 898 | else: |
| 899 | mutation_list = key_entry[column_family] |
| 900 | |
| 901 | # Note: convert the Cassandra timestamp value to microseconds to |
| 902 | # be consistent with standard Cassandra timestamp format. |
| 903 | mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn( |
| 904 | column=Column(name=get_stats_padded_column_part(timestamp_column_part), |
| 905 | value=str(value), timestamp=timestamp*1000))) |
| 906 | mutation_list.append(mutation) |
| 907 | |
| 908 | # Update the stat type index info. |
| 909 | # There can be multiple parameterized types for each base stats type, |
| 910 | # so we need to be careful about checking for existing data for |
| 911 | # the index_entry. Because of the dictionary nature of the put data |
| 912 | # and the way this is serialized into a Python dictionary, though, |
| 913 | # we are guaranteed that there won't be multiple entries for a |
| 914 | # specific parameters stats type or the base stats type, so we don't |
| 915 | # need to handle duplicates for those. |
| 916 | if latest_stat_type_timestamp is not None: |
| 917 | stats_type_index_key = stats_type_base + ':' + target_id |
| 918 | stats_type_info = stats_type_index.get(stats_type_index_key) |
| 919 | if not stats_type_info: |
| 920 | # This is a tuple of two dictionaries: the attributes for |
| 921 | # the base stat type and a dictionary of the parameterized |
| 922 | # types that have been seen for that stat type. The |
| 923 | # parameterized type dictionary is keyed by the name of |
| 924 | # the parameterized type and the value is the associated |
| 925 | # attribute dictionary. |
| 926 | stats_type_info = ({},{}) |
| 927 | stats_type_index[stats_type_index_key] = stats_type_info |
| 928 | stats_type_base_attributes, stats_type_params = stats_type_info |
| 929 | if stats_type_parameter is None: |
| 930 | attributes = stats_type_base_attributes |
| 931 | else: |
| 932 | attributes = stats_type_params.get(stats_type_parameter) |
| 933 | if attributes is None: |
| 934 | attributes = {} |
| 935 | stats_type_params[stats_type_parameter] = attributes |
| 936 | last_updated_entry = attributes.get('last-updated') |
| 937 | if last_updated_entry is None or latest_stat_type_timestamp > last_updated_entry[0]: |
| 938 | attributes['last-updated'] = (latest_stat_type_timestamp, latest_stat_type_timestamp) |
| 939 | |
| 940 | # Update the target index |
| 941 | if latest_id_timestamp is not None: |
| 942 | # FIXME: Always set the controller attributes for now. |
| 943 | # This could/should be optimized to not set this for stats |
| 944 | # whose target type is 'controller' since those will |
| 945 | # always be coming from the same controller (i.e. itself). |
| 946 | # But that change requires some changes (albeit minor) to |
| 947 | # syncd to work correctly which I don't want to mess with |
| 948 | # right now. |
| 949 | #attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp)} |
| 950 | #if target_type != 'controller': |
| 951 | # attributes['controller'] = controller_id |
| 952 | attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp), |
| 953 | 'controller': (latest_id_timestamp, controller_id)} |
| 954 | target_id_list.append((target_id, attributes)) |
| 955 | except Exception, _e: |
| 956 | raise StatsInvalidStatsDataException() |
| 957 | |
| 958 | add_stat_type_index_info_to_mutation_map(cluster, target_type, stats_type_index, mutation_map) |
| 959 | add_target_id_list_to_mutation_map(cluster, target_type, target_id_list, mutation_map) |
| 960 | |
| 961 | call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate, |
| 962 | mutation_map, ConsistencyLevel.ONE) |
| 963 | |
| 964 | |
| 965 | def put_stats_data(cluster, stats_data): |
| 966 | for target_type, target_stats_data in stats_data.items(): |
| 967 | if target_type.endswith(STATS_TARGET_TYPE_PUT_DATA_SUFFIX): |
| 968 | # Strip off the '-stats' suffix |
| 969 | target_type = target_type[:-len(STATS_TARGET_TYPE_PUT_DATA_SUFFIX)] |
| 970 | _put_stats_data(cluster, target_type, target_stats_data) |
| 971 | |
| 972 | |
| 973 | def get_events_padded_column_part(column_part): |
| 974 | """ |
| 975 | For the columns to be sorted correctly by time we need to pad with |
| 976 | leading zeroes up to the maximum range of the bucket |
| 977 | """ |
| 978 | column_part = str(column_part) |
| 979 | leading_zeroes = ('0'*(EVENTS_PADDED_COLUMN_TIME_LENGTH-len(column_part))) |
| 980 | column_part = leading_zeroes + column_part |
| 981 | return column_part |
| 982 | |
| 983 | |
| 984 | def split_events_timestamp(timestamp): |
| 985 | key_part = timestamp / EVENTS_BUCKET_PERIOD |
| 986 | column_part = timestamp % EVENTS_BUCKET_PERIOD |
| 987 | return (key_part, column_part) |
| 988 | |
| 989 | |
| 990 | def construct_log_events_key(cluster, node_id, timestamp_key_part): |
| 991 | return cluster + '|' + node_id + '|' + str(timestamp_key_part) |
| 992 | |
| 993 | |
| 994 | def get_events_slice_predicate(column_start, column_end): |
| 995 | if column_start != '': |
| 996 | column_start = get_events_padded_column_part(column_start) |
| 997 | if column_end != '': |
| 998 | # For the final key in the range of keys we want all of the |
| 999 | # supercolumns whose name starts with end_column_part. |
| 1000 | # If the event has includes a pk tag then the format of the |
| 1001 | # supercolumn name is <timestamp-part>:<pk-tag>. |
| 1002 | # Otherwise it's just the <timestamp-part>. To get both of these |
| 1003 | # cases we set the column end value to be the <timestamp-part> |
| 1004 | # suffixed with ';' (which has an ordinal value 1 greater than |
| 1005 | # ':'. So this will get all of the events with the given column |
| 1006 | # end regardless of whether or not they include the pk tag. |
| 1007 | column_end = get_events_padded_column_part(column_end) + ';' |
| 1008 | slice_predicate = SlicePredicate(slice_range=SliceRange( |
| 1009 | start=column_start, finish=column_end, count=1000000)) |
| 1010 | return slice_predicate |
| 1011 | |
| 1012 | |
| 1013 | def append_log_events_results(event_results, event_list, timestamp_key_part, include_pk_tag=False): |
| 1014 | shifted_timestamp_key_part = int(timestamp_key_part) * EVENTS_BUCKET_PERIOD |
| 1015 | for item in event_results: |
| 1016 | event = {} |
| 1017 | super_column_name = item.super_column.name |
| 1018 | colon_index = super_column_name.find(":") |
| 1019 | if colon_index >= 0: |
| 1020 | if include_pk_tag: |
| 1021 | pk_tag = super_column_name[colon_index:] |
| 1022 | event['pk-tag'] = pk_tag |
| 1023 | timestamp_column_part = super_column_name[:colon_index] |
| 1024 | else: |
| 1025 | timestamp_column_part = super_column_name |
| 1026 | timestamp = shifted_timestamp_key_part + int(timestamp_column_part) |
| 1027 | event['timestamp'] = timestamp |
| 1028 | for column in item.super_column.columns: |
| 1029 | event[column.name] = column.value |
| 1030 | event_list.append(event) |
| 1031 | |
| 1032 | |
| 1033 | def get_log_event_data(cluster, node_id, start_time, end_time, include_pk_tag=False): |
| 1034 | # FIXME: Add some validation of arguments |
| 1035 | start_key_part, start_column_part = split_events_timestamp(int(start_time)) |
| 1036 | end_key_part, end_column_part = split_events_timestamp(int(end_time)) |
| 1037 | |
| 1038 | event_list = [] |
| 1039 | column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME) |
| 1040 | for key_part in range(start_key_part, end_key_part+1): |
| 1041 | current_start = start_column_part if key_part == start_key_part else '' |
| 1042 | current_end = end_column_part if key_part == end_key_part else '' |
| 1043 | # FIXME: How big can the count be? |
| 1044 | slice_predicate = get_events_slice_predicate(current_start, current_end) |
| 1045 | key = construct_log_events_key(cluster, node_id, key_part) |
| 1046 | for attempt in (1,2): |
| 1047 | try: |
| 1048 | results = stats_db_connection.get_client().get_slice(key, |
| 1049 | column_parent, slice_predicate, ConsistencyLevel.ONE) |
| 1050 | break |
| 1051 | except TTransport.TTransportException: |
| 1052 | # Only retry once, so if it's the second time through, |
| 1053 | # propagate the exception |
| 1054 | if attempt == 2: |
| 1055 | raise StatsDatabaseConnectionException() |
| 1056 | stats_db_connection.reconnect() |
| 1057 | except Exception: |
| 1058 | raise StatsDatabaseAccessException() |
| 1059 | append_log_events_results(results, event_list, key_part, include_pk_tag) |
| 1060 | |
| 1061 | return event_list |
| 1062 | |
| 1063 | |
| 1064 | def put_log_event_data(cluster, log_events_data): |
| 1065 | try: |
| 1066 | mutation_map = {} |
| 1067 | for (node_id, node_events) in log_events_data.iteritems(): |
| 1068 | for event in node_events: |
| 1069 | timestamp = event['timestamp'] |
| 1070 | pk_tag = event.get('pk-tag') |
| 1071 | # If the entry in the put data does not specify a tag, then generate a random one. |
| 1072 | # This is so that we can have multiple events with the same timestamp. |
| 1073 | # FIXME: Is there something better we can do here? |
| 1074 | if not pk_tag: |
| 1075 | pk_tag = random.randint(0,10000000000) |
| 1076 | timestamp = int(timestamp) |
| 1077 | timestamp_key_part, timestamp_column_part = split_events_timestamp(timestamp) |
| 1078 | key = construct_log_events_key(cluster, node_id, timestamp_key_part) |
| 1079 | key_entry = mutation_map.get(key) |
| 1080 | if not key_entry: |
| 1081 | mutation_list = [] |
| 1082 | mutation_map[key] = {EVENTS_COLUMN_FAMILY_NAME: mutation_list} |
| 1083 | else: |
| 1084 | mutation_list = key_entry[EVENTS_COLUMN_FAMILY_NAME] |
| 1085 | supercolumn_name = get_events_padded_column_part(timestamp_column_part) |
| 1086 | if pk_tag is not None: |
| 1087 | supercolumn_name += (':' + str(pk_tag)) |
| 1088 | # Build the list of columns in the supercolumn |
| 1089 | column_list = [] |
| 1090 | for (name, value) in event.iteritems(): |
| 1091 | if name != 'timestamp': |
| 1092 | column_list.append(Column(name=name, value=str(value), |
| 1093 | timestamp=timestamp*1000)) |
| 1094 | mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn( |
| 1095 | super_column=SuperColumn(name=supercolumn_name, |
| 1096 | columns=column_list))) |
| 1097 | mutation_list.append(mutation) |
| 1098 | except Exception: |
| 1099 | raise StatsInvalidStatsDataException() |
| 1100 | |
| 1101 | call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate, |
| 1102 | mutation_map, ConsistencyLevel.ONE) |
| 1103 | |
| 1104 | def delete_log_event_data(cluster, node_id, start_time, end_time): |
| 1105 | start_key_part, start_column_part = split_events_timestamp(int(start_time)) |
| 1106 | end_key_part, end_column_part = split_events_timestamp(int(end_time)) |
| 1107 | # The Cassandra timestamps are in microseconds, not milliseconds, |
| 1108 | # so we convert to microseconds. The Cassandra timestamp is derived |
| 1109 | # from the event timestamp (i.e. same time converted to microseconds), |
| 1110 | # so we use the end_time + 1, since that's guaranteed to be greater |
| 1111 | # than any of the timestamps for the sample points we're deleting. |
| 1112 | timestamp = (int(end_time) * 1000) + 1 |
| 1113 | column_path = ColumnPath(column_family=EVENTS_COLUMN_FAMILY_NAME) |
| 1114 | column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME) |
| 1115 | for key_part in range(start_key_part, end_key_part+1): |
| 1116 | key = construct_log_events_key(cluster, node_id, key_part) |
| 1117 | current_start = start_column_part if key_part == start_key_part else '' |
| 1118 | current_end = end_column_part if key_part == end_key_part else '' |
| 1119 | if current_start == '' and current_end == '': |
| 1120 | call_cassandra_with_reconnect(stats_db_connection.get_client().remove, |
| 1121 | key, column_path, timestamp, ConsistencyLevel.ONE) |
| 1122 | else: |
| 1123 | # grrr. Cassandra currently doesn't support doing deletions via a |
| 1124 | # slice range (i.e. a column start and end). You need to give it a |
| 1125 | # list of columns. So we do a get_slice with the slice range and then |
| 1126 | # extract the individual column names from the result of that and |
| 1127 | # build up the column list that we can use to delete the column |
| 1128 | # using batch_mutate. |
| 1129 | slice_predicate = get_events_slice_predicate(current_start, current_end) |
| 1130 | get_results = call_cassandra_with_reconnect( |
| 1131 | stats_db_connection.get_client().get_slice, |
| 1132 | key, column_parent, slice_predicate, ConsistencyLevel.ONE) |
| 1133 | column_names = [] |
| 1134 | for item in get_results: |
| 1135 | column_names.append(item.super_column.name) |
| 1136 | |
| 1137 | deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names)) |
| 1138 | mutation_map = {key: {EVENTS_COLUMN_FAMILY_NAME: [Mutation(deletion=deletion)]}} |
| 1139 | call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate, |
| 1140 | mutation_map, ConsistencyLevel.ONE) |
| 1141 | |
| 1142 | |
| 1143 | def get_closest_sample_interval(requested_sample_interval): |
| 1144 | for i in range(0, len(DOWNSAMPLE_INTERVALS)): |
| 1145 | if DOWNSAMPLE_INTERVALS[i] > requested_sample_interval: |
| 1146 | if i == 0: |
| 1147 | return requested_sample_interval |
| 1148 | downsample_interval = DOWNSAMPLE_INTERVALS[i - 1] |
| 1149 | break |
| 1150 | else: |
| 1151 | downsample_interval = DOWNSAMPLE_INTERVALS[-1] |
| 1152 | # Return the closest multiple of the downsampled interval |
| 1153 | return downsample_interval * (requested_sample_interval // downsample_interval) |
| 1154 | |
| 1155 | |
| 1156 | def get_closest_window_interval(requested_window): |
| 1157 | for i in range(0, len(WINDOW_INTERVALS)): |
| 1158 | if WINDOW_INTERVALS[i] > requested_window: |
| 1159 | return WINDOW_INTERVALS[i - 1] if i > 0 else 0 |
| 1160 | return WINDOW_INTERVALS[-1] |