Base net-virt CLI files on top of which ONOS specific changes will be done
diff --git a/cli/sdncon/stats/data.py b/cli/sdncon/stats/data.py
new file mode 100755
index 0000000..268422c
--- /dev/null
+++ b/cli/sdncon/stats/data.py
@@ -0,0 +1,1160 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from cassandra.ttypes import KsDef, CfDef, InvalidRequestException, TTransport, \
+ SlicePredicate, SliceRange, ColumnParent, ConsistencyLevel, ColumnPath, \
+ Mutation, Deletion, KeyRange, Column, ColumnOrSuperColumn, SuperColumn
+from django.conf import settings
+from .utils import CassandraConnection
+import random
+import datetime
+import time
+from sdncon.controller.config import get_local_controller_id
+
+CONTROLLER_STATS_NAME = 'controller'
+SWITCH_STATS_NAME = 'switch'
+PORT_STATS_NAME = 'port'
+TARGET_INDEX_NAME = 'target_index'
+STATS_TYPE_INDEX_NAME = 'stats_type_index'
+
+STATS_COLUMN_FAMILY_NAME_SUFFIX = '_stats'
+STATS_TARGET_TYPE_PUT_DATA_SUFFIX = '-stats'
+
+STATS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds
+STATS_PADDED_COLUMN_TIME_LENGTH = len(str(STATS_BUCKET_PERIOD))
+
+EVENTS_COLUMN_FAMILY_NAME = 'events'
+EVENTS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds
+EVENTS_PADDED_COLUMN_TIME_LENGTH = len(str(EVENTS_BUCKET_PERIOD))
+
+THIRTY_SECOND_INTERVAL = 30 * 1000
+ONE_MINUTE_INTERVAL = 60 * 1000
+FIVE_MINUTE_INTERVAL = 5 * ONE_MINUTE_INTERVAL
+TEN_MINUTE_INTERVAL = 10 * ONE_MINUTE_INTERVAL
+ONE_HOUR_INTERVAL = 60 * ONE_MINUTE_INTERVAL
+FOUR_HOUR_INTERVAL = 4 * ONE_HOUR_INTERVAL
+ONE_DAY_INTERVAL = 24 * ONE_HOUR_INTERVAL
+ONE_WEEK_INTERVAL = 7 * ONE_DAY_INTERVAL
+FOUR_WEEK_INTERVAL = 4 * ONE_WEEK_INTERVAL
+
+DOWNSAMPLE_INTERVALS = (ONE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL,
+ ONE_HOUR_INTERVAL, FOUR_HOUR_INTERVAL,
+ ONE_DAY_INTERVAL, ONE_WEEK_INTERVAL,
+ FOUR_WEEK_INTERVAL)
+
+WINDOW_INTERVALS = (THIRTY_SECOND_INTERVAL, ONE_MINUTE_INTERVAL,
+ FIVE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL)
+
+VALUE_DATA_FORMAT = 'value'
+RATE_DATA_FORMAT = 'rate'
+
+
+class StatsException(Exception):
+ pass
+
+
+class StatsInvalidStatsDataException(StatsException):
+ def __init__(self):
+ super(StatsInvalidStatsDataException,self).__init__(
+ 'Error adding stats data with incorrect format')
+
+
+class StatsDatabaseConnectionException(StatsException):
+ def __init__(self):
+ super(StatsDatabaseConnectionException,self).__init__(
+ 'Error connecting to stats database')
+
+
+class StatsDatabaseAccessException(StatsException):
+ def __init__(self):
+ super(StatsDatabaseAccessException,self).__init__(
+ 'Error accessing stats database')
+
+class StatsNonnumericValueException(StatsException):
+ def __init__(self, value):
+ super(StatsNonnumericValueException,self).__init__(
+ 'Invalid non-numeric stat value for rate or '
+ 'average value computation: ' + str(value))
+
+
+class StatsRateComputationException(StatsException):
+ def __init__(self):
+ super(StatsRateComputationException,self).__init__(
+ 'Error computing rate; not enough raw data')
+
+
+class StatsInvalidDataFormatException(StatsException):
+ def __init__(self, data_format):
+ super(StatsInvalidDataFormatException,self).__init__(
+ 'Invalid data format: ' + str(data_format))
+
+
+class StatsInvalidStatsTimeRangeException(StatsException):
+ def __init__(self, start_time, end_time):
+ super(StatsInvalidStatsTimeRangeException,self).__init__(
+ 'Invalid stats time range; start = %s; end = %s' %
+ (str(start_time), str(end_time)))
+
+
+class StatsInvalidStatsTypeException(StatsException):
+ def __init__(self, stats_type):
+ super(StatsInvalidStatsTypeException,self).__init__(
+ 'Invalid stats type; name = %s' % str(stats_type))
+
+
+class StatsInvalidStatsMetadataException(StatsException):
+ def __init__(self, file_name):
+ super(StatsInvalidStatsMetadataException,self).__init__(
+ 'Invalid stats metadata from file \"%s\"' % str(file_name))
+
+
+class StatsInternalException(StatsException):
+ def __init__(self, message):
+ super(StatsInternalException,self).__init__(
+ 'Stats internal error: \"%s\"' % str(message))
+
+class StatsCreateColumnFamilyException(StatsException):
+ def __init__(self, name):
+ super(StatsCreateColumnFamilyException,self).__init__(
+ 'Error creating column family; name = ' % name)
+
+# The following code is a hack to get the stats code to use a freshly
+# created test keyspace when we're running the unit tests. I'm guessing
+# there must be some way to detect if we're running under the Django
+# (or Python) unit test framework, but I couldn't find any info about
+# how to do that. So for now, we just provide this function that the
+# unit tests must call before they make any stats call to get the stats
+# code to use the test keyspace instead of the normal production
+# keyspace. Bother.
+
+use_test_keyspace = False
+
+def set_use_test_keyspace():
+ global use_test_keyspace
+ use_test_keyspace = True
+
+
+stats_db_connection = None
+
+# FIXME: Should ideally create the column families on demand as the data
+# is added, so the different target types don't need to be predefined here.
+# Probably not a big problem for right now, though.
+COLUMN_FAMILY_INFO_LIST = (
+ {'name': TARGET_INDEX_NAME,
+ 'column_type': 'Super',
+ 'comparator_type': 'UTF8Type',
+ 'subcomparator_type': 'UTF8Type'},
+ {'name': STATS_TYPE_INDEX_NAME,
+ 'column_type': 'Super',
+ 'comparator_type': 'UTF8Type',
+ 'subcomparator_type': 'UTF8Type'},
+ {'name': CONTROLLER_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
+ 'comparator_type': 'UTF8Type'},
+ {'name': SWITCH_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
+ 'comparator_type': 'UTF8Type'},
+ {'name': PORT_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
+ 'comparator_type': 'UTF8Type'},
+ {'name': EVENTS_COLUMN_FAMILY_NAME,
+ 'column_type': 'Super',
+ 'comparator_type': 'UTF8Type',
+ 'subcomparator_type': 'UTF8Type'},
+)
+
+
+def init_stats_db_connection(host, port, keyspace, user, password,
+ replication_factor, column_family_def_default_settings):
+ global stats_db_connection
+ if not stats_db_connection:
+ if use_test_keyspace:
+ keyspace = "test_" + keyspace
+
+ try:
+ stats_db_connection = CassandraConnection(host, port, keyspace, user, password)
+ stats_db_connection.connect()
+ except Exception:
+ stats_db_connection = None
+ raise StatsException("Error connecting to Cassandra daemon")
+
+ if use_test_keyspace:
+ try:
+ stats_db_connection.get_client().system_drop_keyspace(keyspace)
+ except Exception:
+ pass
+
+ try:
+ stats_db_connection.set_keyspace()
+ create_keyspace = False
+ except Exception:
+ create_keyspace = True
+
+ if create_keyspace:
+ keyspace_def = KsDef(name=keyspace,
+ strategy_class='org.apache.cassandra.locator.SimpleStrategy',
+ replication_factor=replication_factor,
+ cf_defs=[])
+ try:
+ stats_db_connection.get_client().system_add_keyspace(keyspace_def)
+ stats_db_connection.set_keyspace()
+ except Exception, _e:
+ stats_db_connection = None
+ raise StatsException("Error creating stats keyspace")
+
+ for column_family_info in COLUMN_FAMILY_INFO_LIST:
+ try:
+ column_family_def_settings = column_family_def_default_settings.copy()
+ column_family_def_settings.update(column_family_info)
+ column_family_def_settings['keyspace'] = keyspace
+ # pylint: disable=W0142
+ stats_db_connection.get_client().system_add_column_family(
+ CfDef(**column_family_def_settings))
+ except InvalidRequestException, _e:
+ # Assume this is because the column family already exists.
+ # FIXME. Could check exception message for specific string
+ pass
+ except Exception, _e:
+ stats_db_connection = None
+ raise StatsCreateColumnFamilyException(column_family_info.get('name'))
+
+
+def get_stats_db_connection():
+ return stats_db_connection
+
+
+# The following function is mainly intended to be used by the unit tests. It lets
+# you clear out all of the data from the database. Note that since the stats DB
+# is not managed by the normal Django DB mechanism you don't get the automatic
+# DB flushing from the Django TestCase code, so it has to be done explicitly.
+# There's a StatsTestCase subclass of TestCase in the stats unit tests that
+# implements the tearDown method to call flush_stats_db after each test.
+def flush_stats_db():
+ if stats_db_connection is not None:
+ for column_family_info in COLUMN_FAMILY_INFO_LIST:
+ stats_db_connection.get_client().truncate(column_family_info['name'])
+
+def call_cassandra_with_reconnect(fn, *args, **kwargs):
+ try:
+ try:
+ results = fn(*args, **kwargs)
+ except TTransport.TTransportException:
+ stats_db_connection.reconnect()
+ results = fn(*args, **kwargs)
+ except TTransport.TTransportException, _e:
+ raise StatsDatabaseConnectionException()
+ except Exception, _e:
+ raise StatsDatabaseAccessException()
+
+ return results
+
+
+def get_stats_padded_column_part(column_part):
+ """
+ For the columns to be sorted correctly by time we need to pad with
+ leading zeroes up to the maximum range of the bucket
+ """
+ column_part = str(column_part)
+ leading_zeroes = ('0'*(STATS_PADDED_COLUMN_TIME_LENGTH-len(column_part)))
+ column_part = leading_zeroes + column_part
+ return column_part
+
+
+def split_stats_timestamp(timestamp):
+ key_part = timestamp / STATS_BUCKET_PERIOD
+ column_part = timestamp % STATS_BUCKET_PERIOD
+ return (key_part, column_part)
+
+
+def construct_stats_key(cluster, target_id, stats_type, timestamp_key_part):
+ """
+ Constructs the keys for the controller or switch stats.
+ For the controller stats the target_id is the controller node id.
+ For switch stats the target_id is the dpid of the switch.
+ """
+ return cluster + '|' + target_id + '|' + stats_type + '|' + str(timestamp_key_part)
+
+
+def append_stats_results(get_results, values, timestamp_key_part):
+ shifted_timestamp_key_part = int(timestamp_key_part) * STATS_BUCKET_PERIOD
+ for item in get_results:
+ timestamp_column_part = int(item.column.name)
+ value = item.column.value
+ timestamp = shifted_timestamp_key_part + timestamp_column_part
+ values.append((timestamp, value))
+
+
+def get_stats_slice_predicate(column_start, column_end):
+ if column_start != '':
+ column_start = get_stats_padded_column_part(column_start)
+ if column_end != '':
+ column_end = get_stats_padded_column_part(column_end)
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start=column_start, finish=column_end, count=1000000))
+ return slice_predicate
+
+
+def check_time_range(start_time, end_time):
+ if int(end_time) < int(start_time):
+ raise StatsInvalidStatsTimeRangeException(start_time, end_time)
+
+
+def check_valid_data_format(data_format):
+ if data_format != VALUE_DATA_FORMAT and data_format != RATE_DATA_FORMAT:
+ raise StatsInvalidDataFormatException(data_format)
+
+
+def get_window_range(raw_stats_values, index, window):
+
+ if window == 0:
+ return (index, index)
+
+ # Get start index
+ timestamp = raw_stats_values[index][0]
+ start_timestamp = timestamp - (window / 2)
+ end_timestamp = timestamp + (window / 2)
+
+ start_index = index
+ while start_index > 0:
+ next_timestamp = raw_stats_values[start_index - 1][0]
+ if next_timestamp < start_timestamp:
+ break
+ start_index -= 1
+
+ end_index = index
+ while end_index < len(raw_stats_values) - 1:
+ next_timestamp = raw_stats_values[end_index + 1][0]
+ if next_timestamp > end_timestamp:
+ break
+ end_index += 1
+
+ return (start_index, end_index)
+
+
+def convert_stat_string_to_value(stat_string):
+ try:
+ stat_value = int(stat_string)
+ except ValueError:
+ try:
+ stat_value = float(stat_string)
+ except ValueError:
+ stat_value = stat_string
+ return stat_value
+
+
+def get_rate_over_stats_values(stats_values):
+
+ if len(stats_values) < 2:
+ return None
+
+ start_stat = stats_values[0]
+ end_stat = stats_values[-1]
+
+ timestamp_delta = end_stat[0] - start_stat[0]
+ # NOTE: In computing the value_delta here it's safe to assume floats
+ # rather than calling convert_stat_string_to_value because we're going
+ # to be converting to float anyway when we do the rate calculation later.
+ # So there's no point in trying to differentiate between int and float
+ # and rate doesn't make sense for any other type of stat data (e.g. string)
+ value_delta = float(end_stat[1]) - float(start_stat[1])
+ if timestamp_delta == 0:
+ rate = float('inf' if value_delta > 0 else '-inf')
+ else:
+ rate = value_delta / timestamp_delta
+
+ return rate
+
+
+def get_rate_over_window(raw_stats_values, index, window):
+ if len(raw_stats_values) < 2:
+ return None
+
+ if window == 0:
+ if index == 0:
+ start_index = 0
+ end_index = 1
+ else:
+ start_index = index - 1
+ end_index = index
+ else:
+ start_index, end_index = get_window_range(raw_stats_values, index, window)
+
+ return get_rate_over_stats_values(raw_stats_values[start_index:end_index + 1])
+
+
+def get_average_over_stats_values(stats_values):
+
+ total = 0
+ count = 0
+ for stat_value in stats_values:
+ # FIXME: Should we just always convert to float here?
+ # This would give a more accurate result for the average calculation
+ # but would mean that the data type is different for a
+ # zero vs. non-zero window size.
+ value = convert_stat_string_to_value(stat_value[1])
+ if type(value) not in (int,float):
+ raise StatsNonnumericValueException(value)
+ total += value
+ count += 1
+
+ return (total / count) if count > 0 else None
+
+
+def get_average_value_over_window(raw_stats_values, index, window):
+ start_index, end_index = get_window_range(raw_stats_values, index, window)
+ stats_values = raw_stats_values[start_index:end_index + 1]
+ return get_average_over_stats_values(stats_values)
+
+
+def reverse_stats_data_generator(cluster, target_type, target_id, stats_type,
+ start_time=None, end_time=None,
+ chunk_interval=3600000):
+ if start_time is None:
+ start_time = int(time.time() * 1000)
+ if end_time is None:
+ # By default, don't go back past 1/1/2011. This was before we had stats support
+ # in the controller, so we shouldn't have any data earlier than that (except if
+ # the clock on the controller was set incorrectly).
+ end_time = int(time.mktime(datetime.datetime(2011,1,1).timetuple()) * 1000)
+ end_key_part, _end_column_part = split_stats_timestamp(end_time)
+ key_part, column_part = split_stats_timestamp(start_time)
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ column_parent = ColumnParent(column_family)
+ # FIXME: Should add support for chunk_interval to be either iterable or a
+ # list/tuple to give a sequence of chunk intervals to use. The last available
+ # chunk interval from the list/tuple/iterator would then be used for any
+ # subsequent cassandra calls
+ #chunk_interval_iter = (chunk_interval if isinstance(chunk_interval, list) or
+ # isinstance(chunk_interval, tuple) else [chunk_interval])
+ while key_part >= 0:
+ key = construct_stats_key(cluster, target_id, stats_type, key_part)
+
+ while True:
+ column_start = column_part - chunk_interval
+ if column_start < 0:
+ column_start = 0
+ slice_predicate = get_stats_slice_predicate(column_start, column_part)
+ for attempt in (1,2):
+ try:
+ get_results = stats_db_connection.get_client().get_slice(key,
+ column_parent, slice_predicate, ConsistencyLevel.ONE)
+ for item in reversed(get_results):
+ timestamp = (key_part * STATS_BUCKET_PERIOD) + int(item.column.name)
+ value = item.column.value
+ yield (timestamp, value)
+ break
+ except TTransport.TTransportException:
+ # Only retry once, so if it's the second time through,
+ # propagate the exception
+ if attempt == 2:
+ raise StatsDatabaseConnectionException()
+ stats_db_connection.reconnect()
+ except Exception:
+ raise StatsDatabaseAccessException()
+
+ column_part = column_start
+ if column_part == 0:
+ break
+
+ if key_part == end_key_part:
+ break
+ key_part -= 1
+ column_part = STATS_BUCKET_PERIOD - 1
+
+
+def get_latest_stat_data(cluster, target_type, target_id, stats_type,
+ window=0, data_format=VALUE_DATA_FORMAT):
+
+ check_valid_data_format(data_format)
+
+ minimum_data_points = 2 if data_format == RATE_DATA_FORMAT else 1
+ stats_data_window = []
+ latest_stat_timestamp = None
+
+ start_time = int(time.time() * 1000)
+ # Limit how far back we'll look for the latest stat value.
+ # 86400000 is 1 day in ms
+ end_time = start_time - 86400000
+ for stat_data_point in reverse_stats_data_generator(cluster,
+ target_type, target_id, stats_type, start_time, end_time):
+ current_stat_timestamp = stat_data_point[0]
+ if latest_stat_timestamp is None:
+ latest_stat_timestamp = current_stat_timestamp
+
+ # NOTE: For stats operations we treat the window for the rate or
+ # average calculation to be centered around the current timestamp.
+ # For the latest stat case there is no data after the current point.
+ # We could extend the window back further so that the current timestamp
+ # is the end of the window range instead of the middle, but then that
+ # would be inconsistent with the other cases, so instead we just go
+ # back to half the window size. I haven't been able to convince myself
+ # strongly one way or the other which is better (or how much it matters)
+ outside_window = (latest_stat_timestamp - current_stat_timestamp) > (window / 2)
+ if len(stats_data_window) >= minimum_data_points and outside_window:
+ break
+
+ stats_data_window.insert(0, stat_data_point)
+
+ if (window == 0) and (len(stats_data_window) >= minimum_data_points):
+ break
+
+ stat_data_point = None
+
+ if latest_stat_timestamp is not None:
+ if data_format == VALUE_DATA_FORMAT:
+ value = get_average_over_stats_values(stats_data_window)
+ else:
+ assert data_format == RATE_DATA_FORMAT, "Invalid data format"
+ value = get_rate_over_stats_values(stats_data_window)
+ if value is not None:
+ stat_data_point = (latest_stat_timestamp, value)
+
+ return stat_data_point
+
+
+def get_stats_data(cluster, target_type, target_id, stats_type,
+ start_time, end_time, sample_interval=0, window=0,
+ data_format=VALUE_DATA_FORMAT, limit=None):
+
+ check_time_range(start_time, end_time)
+ check_valid_data_format(data_format)
+ # FIXME: Add validation of other arguments
+
+ start_key_part, start_column_part = split_stats_timestamp(int(start_time))
+ end_key_part, end_column_part = split_stats_timestamp(int(end_time))
+
+ raw_stats_values = []
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ column_parent = ColumnParent(column_family)
+
+ for key_part in range(start_key_part, end_key_part+1):
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ # FIXME: How big can the count be?
+ slice_predicate = get_stats_slice_predicate(current_start, current_end)
+ key = construct_stats_key(cluster, target_id, stats_type, key_part)
+ for attempt in (1,2):
+ try:
+ get_results = stats_db_connection.get_client().get_slice(key,
+ column_parent, slice_predicate, ConsistencyLevel.ONE)
+ break
+ except TTransport.TTransportException:
+ # Only retry once, so if it's the second time through,
+ # propagate the exception
+ if attempt == 2:
+ raise StatsDatabaseConnectionException()
+ stats_db_connection.reconnect()
+ except Exception:
+ raise StatsDatabaseAccessException()
+
+ append_stats_results(get_results, raw_stats_values, key_part)
+
+ # FIXME: This logic to handle the limit argument isn't complete.
+ # It doesn't account for a non-zero window or dpwnsampling.
+ if (limit != None and sample_interval == 0 and window == 0 and
+ len(raw_stats_values) > limit):
+ raw_stats_values = raw_stats_values[:limit]
+ break
+
+ stats_values = []
+ last_downsample_index = None
+ for i in range(0, len(raw_stats_values)):
+ # Handle downsampling
+ if sample_interval != 0:
+ downsample_index = raw_stats_values[i][0] / sample_interval
+ if downsample_index == last_downsample_index:
+ continue
+ last_downsample_index = downsample_index
+
+ # Get the value based for the specified data format
+ if data_format == VALUE_DATA_FORMAT:
+ if sample_interval == 0:
+ value = convert_stat_string_to_value(raw_stats_values[i][1])
+ else:
+ value = get_average_value_over_window(raw_stats_values, i, window)
+ else:
+ assert data_format == RATE_DATA_FORMAT, "Invalid data format"
+ value = get_rate_over_window(raw_stats_values, i, window)
+
+ if value is not None:
+ stats_values.append((raw_stats_values[i][0], value))
+
+ return stats_values
+
+
+def delete_stats_data(cluster, target_type, target_id, stats_type,
+ start_time, end_time):
+
+ check_time_range(start_time, end_time)
+ # FIXME: Add validation of other arguments
+
+ start_key_part, start_column_part = split_stats_timestamp(int(start_time))
+ end_key_part, end_column_part = split_stats_timestamp(int(end_time))
+
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ column_parent = ColumnParent(column_family)
+ # The Cassandra timestamps are in microseconds, not milliseconds,
+ # so we convert to microseconds. The Cassandra timestamp is derived
+ # from the stat timestamp (i.e. same time converted to microseconds),
+ # so we use the end_time + 1, since that's guaranteed to be greater
+ # than any of the timestamps for the sample points we're deleting.
+ timestamp = (int(end_time) * 1000) + 1
+ for key_part in range(start_key_part, end_key_part+1):
+ key = construct_stats_key(cluster, target_id, stats_type, key_part)
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ if current_start == '' and current_end == '':
+ call_cassandra_with_reconnect(stats_db_connection.get_client().remove,
+ key, ColumnPath(column_family=column_family), timestamp,
+ ConsistencyLevel.ONE)
+ else:
+ # grrr. Cassandra currently doesn't support doing deletions via a
+ # slice range (i.e. a column start and end). You need to give it a
+ # list of columns. So we do a get_slice with the slice range and then
+ # extract the individual column names from the result of that and
+ # build up the column list that we can use to delete the column
+ # using batch_mutate.
+ slice_predicate = get_stats_slice_predicate(current_start, current_end)
+ get_results = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice,
+ key, column_parent, slice_predicate, ConsistencyLevel.ONE)
+ column_names = []
+ for item in get_results:
+ column_names.append(item.column.name)
+
+ deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names))
+ mutation_map = {key: {column_family: [Mutation(deletion=deletion)]}}
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+
+STATS_METADATA_VARIABLE_NAME = 'STATS_METADATA'
+
+stats_metadata = None
+
+def init_stats_metadata(_cluster):
+ """
+ Initialize the dictionary of stats metadata. Currently this is initialized
+ from a directory of metadata files that contain the metadata. Currently,
+ there is no differentiation in the stats types that are supported across
+ clusters, so we ignore the cluster argument and we just maintain a
+ global map of stat type metadata.
+ """
+ global stats_metadata
+ if not stats_metadata:
+ stats_metadata = {}
+ for module_name in settings.STATS_METADATA_MODULES:
+ metadata_module = __import__(module_name,
+ fromlist=[STATS_METADATA_VARIABLE_NAME])
+ if not metadata_module:
+ # FIXME: log error
+ continue
+
+ if STATS_METADATA_VARIABLE_NAME not in dir(metadata_module):
+ # FIXME: log error
+ continue
+
+ metadata_list = getattr(metadata_module, STATS_METADATA_VARIABLE_NAME)
+
+ if type(metadata_list) is dict:
+ metadata_list = [metadata_list]
+
+ if type(metadata_list) is not list and type(metadata_list) is not tuple:
+ raise StatsInvalidStatsMetadataException(module_name)
+
+ for metadata in metadata_list:
+ if type(metadata) is not dict:
+ raise StatsInvalidStatsMetadataException(module_name)
+
+ name = metadata.get('name')
+ if not name:
+ raise StatsInvalidStatsMetadataException(module_name)
+ name = str(name)
+
+ # Auto-set the verbose_name to the name if it's not set explicitly
+ verbose_name = metadata.get('verbose_name')
+ if not verbose_name:
+ metadata['verbose_name'] = name
+
+ # FIXME: Validate other contents of metadata.
+ # e.g. flag name conflicts between files.
+
+ stats_metadata[name] = metadata
+
+def get_stats_metadata(cluster, stats_type=None):
+ init_stats_metadata(cluster)
+ # If no stat_type is specified return the entire dictionary of stat types
+ metadata = stats_metadata.get(stats_type) if stats_type else stats_metadata
+ if metadata is None:
+ raise StatsInvalidStatsTypeException(stats_type)
+ return metadata
+
+
+STATS_INDEX_ATTRIBUTE_TYPES = {
+ 'last-updated': int
+}
+def stats_type_slice_to_index_data(stats_type_slice):
+ index_data = {}
+ for super_column in stats_type_slice:
+ name = super_column.super_column.name
+ column_list = super_column.super_column.columns
+ if name == 'base':
+ insert_map = index_data
+ elif name.startswith('param:'):
+ colon_index = name.find(':')
+ parameter_name = name[colon_index+1:]
+ parameter_map = index_data.get('parameters')
+ if not parameter_map:
+ parameter_map = {}
+ index_data['parameters'] = parameter_map
+ insert_map = {}
+ parameter_map[parameter_name] = insert_map
+ else:
+ raise StatsInternalException('Invalid stats type index name: ' + str(name))
+
+ for column in column_list:
+ value = column.value
+ attribute_type = STATS_INDEX_ATTRIBUTE_TYPES.get(column.name)
+ if attribute_type is not None:
+ value = attribute_type(value)
+ insert_map[column.name] = value
+
+ return index_data
+
+
+def get_stats_type_index(cluster, target_type, target_id, stats_type=None):
+ column_parent = ColumnParent(STATS_TYPE_INDEX_NAME)
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start='', finish='', count=1000000))
+ key_prefix = cluster + ':' + target_type + ':' + target_id
+ if stats_type is None:
+ key_range = KeyRange(start_key=key_prefix+':', end_key=key_prefix+';', count=100000)
+ key_slice_list = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_range_slices,
+ column_parent, slice_predicate, key_range, ConsistencyLevel.ONE)
+ stats_index_data = {}
+ for key_slice in key_slice_list:
+ key = key_slice.key
+ colon_index = key.rfind(':')
+ if colon_index < 0:
+ raise StatsInternalException('Invalid stats type index key: ' + str(key))
+ stats_type = key[colon_index+1:]
+ stats_index_data[stats_type] = stats_type_slice_to_index_data(key_slice.columns)
+ else:
+ key = key_prefix + ':' + stats_type
+ stats_type_slice = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice, key, column_parent,
+ slice_predicate, ConsistencyLevel.ONE)
+ stats_index_data = stats_type_slice_to_index_data(stats_type_slice)
+
+ return stats_index_data
+
+
+def get_stats_target_types(cluster):
+ column_parent = ColumnParent(TARGET_INDEX_NAME)
+ slice_predicate = SlicePredicate(column_names=[])
+ key_range = KeyRange(start_key=cluster+':', end_key=cluster+';', count=100000)
+ key_slice_list = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_range_slices,
+ column_parent, slice_predicate, key_range, ConsistencyLevel.ONE)
+
+ target_types = {}
+ for key_slice in key_slice_list:
+ target_type = key_slice.key[len(cluster)+1:]
+
+ target_types[target_type] = {}
+
+ return target_types
+
+
+STATS_TARGET_ATTRIBUTE_TYPES = {
+ 'last-updated': int
+}
+
+def get_stats_targets(cluster, target_type):
+ key = cluster + ':' + target_type
+ column_parent = ColumnParent(TARGET_INDEX_NAME)
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start='', finish='', count=1000000))
+ super_column_list = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice, key, column_parent,
+ slice_predicate, ConsistencyLevel.ONE)
+ target_list = {}
+ for item in super_column_list:
+ target = {}
+ for column in item.super_column.columns:
+ value = column.value
+ attribute_type = STATS_TARGET_ATTRIBUTE_TYPES.get(column.name)
+ if attribute_type is not None:
+ value = attribute_type(value)
+ target[column.name] = value
+ target_list[item.super_column.name] = target
+
+ return target_list
+
+
+# FIXME: Should update the code below to use these constants
+# instead of string literals
+LAST_UPDATED_ATTRIBUTE_NAME = 'last-updated'
+CONTROLLER_ATTRIBUTE_NAME = 'controller'
+BASE_SUPER_COLUMN_NAME = 'base'
+PARAMETERS_SUPER_COLUMN_NAME = 'parameters'
+PARAM_SUPER_COLUMN_NAME_PREFIX = 'param:'
+
+def append_attributes_to_mutation_list(attributes, supercolumn_name, mutation_list):
+ column_list = []
+ for name, info in attributes.iteritems():
+ timestamp, value = info
+ column = Column(name=name, value=str(value), timestamp=timestamp*1000)
+ column_list.append(column)
+ mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
+ super_column=SuperColumn(name=supercolumn_name, columns=column_list)))
+ mutation_list.append(mutation)
+
+
+def add_stat_type_index_info_to_mutation_map(cluster, target_type,
+ stats_type_index, mutation_map):
+ for key, stats_type_info in stats_type_index.iteritems():
+ separator_index = key.find(':')
+ assert separator_index >= 0
+ base_stat_type_name = key[:separator_index]
+ target_id = key[separator_index + 1:]
+ stats_type_base_attributes, stats_type_params = stats_type_info
+ mutation_list = []
+ append_attributes_to_mutation_list(stats_type_base_attributes,
+ 'base', mutation_list)
+ for name, attributes in stats_type_params.iteritems():
+ append_attributes_to_mutation_list(attributes,
+ 'param:' + name, mutation_list)
+ mutation_key = cluster + ':' + target_type + ':' + target_id + ':' + base_stat_type_name
+ mutation_map[mutation_key] = {STATS_TYPE_INDEX_NAME: mutation_list}
+
+
+def add_target_id_list_to_mutation_map(cluster, target_type,
+ target_id_list, mutation_map):
+ mutation_list = []
+ for target_id, attributes in target_id_list:
+ append_attributes_to_mutation_list(attributes, target_id, mutation_list)
+ key = cluster + ':' + target_type
+ mutation_map[key] = {TARGET_INDEX_NAME: mutation_list}
+
+
+def _put_stats_data(cluster, target_type, stats_data):
+ try:
+ controller_id = get_local_controller_id()
+ mutation_map = {}
+ target_id_list = []
+ stats_type_index = {}
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ for (target_id, target_id_stats) in stats_data.iteritems():
+ # Map 'localhost' controller to the actual ID for the local controller
+ # FIXME: Eventually we should fix up the other components (e.g. statd)
+ # that invoke this REST API to not use localhost and instead use the
+ # REST API to obtain the real ID for the local controller, but for now
+ # this works to ensure we're not using localhost in any of the data we
+ # store in the DB (unless, of course, the uuid version of the controller
+ # ID hasn't been written to the boot-config file, in which case it will
+ # default to the old localhost value).
+ if target_type == 'controller' and target_id == 'localhost':
+ target_id = controller_id
+ latest_id_timestamp = None
+ for (stats_type, stats_data_array) in target_id_stats.iteritems():
+ # Check if it's a parameterized type and extract the base
+ # stat type and parameter name.
+ parameter_separator = stats_type.find('__')
+ if parameter_separator >= 0:
+ stats_type_base = stats_type[:parameter_separator]
+ stats_type_parameter = stats_type[parameter_separator+2:]
+ else:
+ stats_type_base = stats_type
+ stats_type_parameter = None
+
+ latest_stat_type_timestamp = None
+
+ # Add the stats values to the mutation map
+ for stats_value in stats_data_array:
+ timestamp = int(stats_value['timestamp'])
+ if latest_stat_type_timestamp is None or timestamp > latest_stat_type_timestamp:
+ latest_stat_type_timestamp = timestamp
+ if latest_id_timestamp is None or timestamp > latest_id_timestamp:
+ latest_id_timestamp = timestamp
+ value = stats_value['value']
+ timestamp_key_part, timestamp_column_part = split_stats_timestamp(timestamp)
+ key = construct_stats_key(cluster, target_id, stats_type, timestamp_key_part)
+ key_entry = mutation_map.get(key)
+ if not key_entry:
+ mutation_list = []
+ mutation_map[key] = {column_family: mutation_list}
+ else:
+ mutation_list = key_entry[column_family]
+
+ # Note: convert the Cassandra timestamp value to microseconds to
+ # be consistent with standard Cassandra timestamp format.
+ mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
+ column=Column(name=get_stats_padded_column_part(timestamp_column_part),
+ value=str(value), timestamp=timestamp*1000)))
+ mutation_list.append(mutation)
+
+ # Update the stat type index info.
+ # There can be multiple parameterized types for each base stats type,
+ # so we need to be careful about checking for existing data for
+ # the index_entry. Because of the dictionary nature of the put data
+ # and the way this is serialized into a Python dictionary, though,
+ # we are guaranteed that there won't be multiple entries for a
+ # specific parameters stats type or the base stats type, so we don't
+ # need to handle duplicates for those.
+ if latest_stat_type_timestamp is not None:
+ stats_type_index_key = stats_type_base + ':' + target_id
+ stats_type_info = stats_type_index.get(stats_type_index_key)
+ if not stats_type_info:
+ # This is a tuple of two dictionaries: the attributes for
+ # the base stat type and a dictionary of the parameterized
+ # types that have been seen for that stat type. The
+ # parameterized type dictionary is keyed by the name of
+ # the parameterized type and the value is the associated
+ # attribute dictionary.
+ stats_type_info = ({},{})
+ stats_type_index[stats_type_index_key] = stats_type_info
+ stats_type_base_attributes, stats_type_params = stats_type_info
+ if stats_type_parameter is None:
+ attributes = stats_type_base_attributes
+ else:
+ attributes = stats_type_params.get(stats_type_parameter)
+ if attributes is None:
+ attributes = {}
+ stats_type_params[stats_type_parameter] = attributes
+ last_updated_entry = attributes.get('last-updated')
+ if last_updated_entry is None or latest_stat_type_timestamp > last_updated_entry[0]:
+ attributes['last-updated'] = (latest_stat_type_timestamp, latest_stat_type_timestamp)
+
+ # Update the target index
+ if latest_id_timestamp is not None:
+ # FIXME: Always set the controller attributes for now.
+ # This could/should be optimized to not set this for stats
+ # whose target type is 'controller' since those will
+ # always be coming from the same controller (i.e. itself).
+ # But that change requires some changes (albeit minor) to
+ # syncd to work correctly which I don't want to mess with
+ # right now.
+ #attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp)}
+ #if target_type != 'controller':
+ # attributes['controller'] = controller_id
+ attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp),
+ 'controller': (latest_id_timestamp, controller_id)}
+ target_id_list.append((target_id, attributes))
+ except Exception, _e:
+ raise StatsInvalidStatsDataException()
+
+ add_stat_type_index_info_to_mutation_map(cluster, target_type, stats_type_index, mutation_map)
+ add_target_id_list_to_mutation_map(cluster, target_type, target_id_list, mutation_map)
+
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+
+def put_stats_data(cluster, stats_data):
+ for target_type, target_stats_data in stats_data.items():
+ if target_type.endswith(STATS_TARGET_TYPE_PUT_DATA_SUFFIX):
+ # Strip off the '-stats' suffix
+ target_type = target_type[:-len(STATS_TARGET_TYPE_PUT_DATA_SUFFIX)]
+ _put_stats_data(cluster, target_type, target_stats_data)
+
+
+def get_events_padded_column_part(column_part):
+ """
+ For the columns to be sorted correctly by time we need to pad with
+ leading zeroes up to the maximum range of the bucket
+ """
+ column_part = str(column_part)
+ leading_zeroes = ('0'*(EVENTS_PADDED_COLUMN_TIME_LENGTH-len(column_part)))
+ column_part = leading_zeroes + column_part
+ return column_part
+
+
+def split_events_timestamp(timestamp):
+ key_part = timestamp / EVENTS_BUCKET_PERIOD
+ column_part = timestamp % EVENTS_BUCKET_PERIOD
+ return (key_part, column_part)
+
+
+def construct_log_events_key(cluster, node_id, timestamp_key_part):
+ return cluster + '|' + node_id + '|' + str(timestamp_key_part)
+
+
+def get_events_slice_predicate(column_start, column_end):
+ if column_start != '':
+ column_start = get_events_padded_column_part(column_start)
+ if column_end != '':
+ # For the final key in the range of keys we want all of the
+ # supercolumns whose name starts with end_column_part.
+ # If the event has includes a pk tag then the format of the
+ # supercolumn name is <timestamp-part>:<pk-tag>.
+ # Otherwise it's just the <timestamp-part>. To get both of these
+ # cases we set the column end value to be the <timestamp-part>
+ # suffixed with ';' (which has an ordinal value 1 greater than
+ # ':'. So this will get all of the events with the given column
+ # end regardless of whether or not they include the pk tag.
+ column_end = get_events_padded_column_part(column_end) + ';'
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start=column_start, finish=column_end, count=1000000))
+ return slice_predicate
+
+
+def append_log_events_results(event_results, event_list, timestamp_key_part, include_pk_tag=False):
+ shifted_timestamp_key_part = int(timestamp_key_part) * EVENTS_BUCKET_PERIOD
+ for item in event_results:
+ event = {}
+ super_column_name = item.super_column.name
+ colon_index = super_column_name.find(":")
+ if colon_index >= 0:
+ if include_pk_tag:
+ pk_tag = super_column_name[colon_index:]
+ event['pk-tag'] = pk_tag
+ timestamp_column_part = super_column_name[:colon_index]
+ else:
+ timestamp_column_part = super_column_name
+ timestamp = shifted_timestamp_key_part + int(timestamp_column_part)
+ event['timestamp'] = timestamp
+ for column in item.super_column.columns:
+ event[column.name] = column.value
+ event_list.append(event)
+
+
+def get_log_event_data(cluster, node_id, start_time, end_time, include_pk_tag=False):
+ # FIXME: Add some validation of arguments
+ start_key_part, start_column_part = split_events_timestamp(int(start_time))
+ end_key_part, end_column_part = split_events_timestamp(int(end_time))
+
+ event_list = []
+ column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME)
+ for key_part in range(start_key_part, end_key_part+1):
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ # FIXME: How big can the count be?
+ slice_predicate = get_events_slice_predicate(current_start, current_end)
+ key = construct_log_events_key(cluster, node_id, key_part)
+ for attempt in (1,2):
+ try:
+ results = stats_db_connection.get_client().get_slice(key,
+ column_parent, slice_predicate, ConsistencyLevel.ONE)
+ break
+ except TTransport.TTransportException:
+ # Only retry once, so if it's the second time through,
+ # propagate the exception
+ if attempt == 2:
+ raise StatsDatabaseConnectionException()
+ stats_db_connection.reconnect()
+ except Exception:
+ raise StatsDatabaseAccessException()
+ append_log_events_results(results, event_list, key_part, include_pk_tag)
+
+ return event_list
+
+
+def put_log_event_data(cluster, log_events_data):
+ try:
+ mutation_map = {}
+ for (node_id, node_events) in log_events_data.iteritems():
+ for event in node_events:
+ timestamp = event['timestamp']
+ pk_tag = event.get('pk-tag')
+ # If the entry in the put data does not specify a tag, then generate a random one.
+ # This is so that we can have multiple events with the same timestamp.
+ # FIXME: Is there something better we can do here?
+ if not pk_tag:
+ pk_tag = random.randint(0,10000000000)
+ timestamp = int(timestamp)
+ timestamp_key_part, timestamp_column_part = split_events_timestamp(timestamp)
+ key = construct_log_events_key(cluster, node_id, timestamp_key_part)
+ key_entry = mutation_map.get(key)
+ if not key_entry:
+ mutation_list = []
+ mutation_map[key] = {EVENTS_COLUMN_FAMILY_NAME: mutation_list}
+ else:
+ mutation_list = key_entry[EVENTS_COLUMN_FAMILY_NAME]
+ supercolumn_name = get_events_padded_column_part(timestamp_column_part)
+ if pk_tag is not None:
+ supercolumn_name += (':' + str(pk_tag))
+ # Build the list of columns in the supercolumn
+ column_list = []
+ for (name, value) in event.iteritems():
+ if name != 'timestamp':
+ column_list.append(Column(name=name, value=str(value),
+ timestamp=timestamp*1000))
+ mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
+ super_column=SuperColumn(name=supercolumn_name,
+ columns=column_list)))
+ mutation_list.append(mutation)
+ except Exception:
+ raise StatsInvalidStatsDataException()
+
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+def delete_log_event_data(cluster, node_id, start_time, end_time):
+ start_key_part, start_column_part = split_events_timestamp(int(start_time))
+ end_key_part, end_column_part = split_events_timestamp(int(end_time))
+ # The Cassandra timestamps are in microseconds, not milliseconds,
+ # so we convert to microseconds. The Cassandra timestamp is derived
+ # from the event timestamp (i.e. same time converted to microseconds),
+ # so we use the end_time + 1, since that's guaranteed to be greater
+ # than any of the timestamps for the sample points we're deleting.
+ timestamp = (int(end_time) * 1000) + 1
+ column_path = ColumnPath(column_family=EVENTS_COLUMN_FAMILY_NAME)
+ column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME)
+ for key_part in range(start_key_part, end_key_part+1):
+ key = construct_log_events_key(cluster, node_id, key_part)
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ if current_start == '' and current_end == '':
+ call_cassandra_with_reconnect(stats_db_connection.get_client().remove,
+ key, column_path, timestamp, ConsistencyLevel.ONE)
+ else:
+ # grrr. Cassandra currently doesn't support doing deletions via a
+ # slice range (i.e. a column start and end). You need to give it a
+ # list of columns. So we do a get_slice with the slice range and then
+ # extract the individual column names from the result of that and
+ # build up the column list that we can use to delete the column
+ # using batch_mutate.
+ slice_predicate = get_events_slice_predicate(current_start, current_end)
+ get_results = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice,
+ key, column_parent, slice_predicate, ConsistencyLevel.ONE)
+ column_names = []
+ for item in get_results:
+ column_names.append(item.super_column.name)
+
+ deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names))
+ mutation_map = {key: {EVENTS_COLUMN_FAMILY_NAME: [Mutation(deletion=deletion)]}}
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+
+def get_closest_sample_interval(requested_sample_interval):
+ for i in range(0, len(DOWNSAMPLE_INTERVALS)):
+ if DOWNSAMPLE_INTERVALS[i] > requested_sample_interval:
+ if i == 0:
+ return requested_sample_interval
+ downsample_interval = DOWNSAMPLE_INTERVALS[i - 1]
+ break
+ else:
+ downsample_interval = DOWNSAMPLE_INTERVALS[-1]
+ # Return the closest multiple of the downsampled interval
+ return downsample_interval * (requested_sample_interval // downsample_interval)
+
+
+def get_closest_window_interval(requested_window):
+ for i in range(0, len(WINDOW_INTERVALS)):
+ if WINDOW_INTERVALS[i] > requested_window:
+ return WINDOW_INTERVALS[i - 1] if i > 0 else 0
+ return WINDOW_INTERVALS[-1]