Base net-virt CLI files on top of which ONOS specific changes will be done
diff --git a/cli/sdncon/stats/__init__.py b/cli/sdncon/stats/__init__.py
new file mode 100755
index 0000000..9ab2783
--- /dev/null
+++ b/cli/sdncon/stats/__init__.py
@@ -0,0 +1,16 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
diff --git a/cli/sdncon/stats/data.py b/cli/sdncon/stats/data.py
new file mode 100755
index 0000000..268422c
--- /dev/null
+++ b/cli/sdncon/stats/data.py
@@ -0,0 +1,1160 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from cassandra.ttypes import KsDef, CfDef, InvalidRequestException, TTransport, \
+ SlicePredicate, SliceRange, ColumnParent, ConsistencyLevel, ColumnPath, \
+ Mutation, Deletion, KeyRange, Column, ColumnOrSuperColumn, SuperColumn
+from django.conf import settings
+from .utils import CassandraConnection
+import random
+import datetime
+import time
+from sdncon.controller.config import get_local_controller_id
+
+CONTROLLER_STATS_NAME = 'controller'
+SWITCH_STATS_NAME = 'switch'
+PORT_STATS_NAME = 'port'
+TARGET_INDEX_NAME = 'target_index'
+STATS_TYPE_INDEX_NAME = 'stats_type_index'
+
+STATS_COLUMN_FAMILY_NAME_SUFFIX = '_stats'
+STATS_TARGET_TYPE_PUT_DATA_SUFFIX = '-stats'
+
+STATS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds
+STATS_PADDED_COLUMN_TIME_LENGTH = len(str(STATS_BUCKET_PERIOD))
+
+EVENTS_COLUMN_FAMILY_NAME = 'events'
+EVENTS_BUCKET_PERIOD = 60*60*24*1000 # 1 day in milliseconds
+EVENTS_PADDED_COLUMN_TIME_LENGTH = len(str(EVENTS_BUCKET_PERIOD))
+
+THIRTY_SECOND_INTERVAL = 30 * 1000
+ONE_MINUTE_INTERVAL = 60 * 1000
+FIVE_MINUTE_INTERVAL = 5 * ONE_MINUTE_INTERVAL
+TEN_MINUTE_INTERVAL = 10 * ONE_MINUTE_INTERVAL
+ONE_HOUR_INTERVAL = 60 * ONE_MINUTE_INTERVAL
+FOUR_HOUR_INTERVAL = 4 * ONE_HOUR_INTERVAL
+ONE_DAY_INTERVAL = 24 * ONE_HOUR_INTERVAL
+ONE_WEEK_INTERVAL = 7 * ONE_DAY_INTERVAL
+FOUR_WEEK_INTERVAL = 4 * ONE_WEEK_INTERVAL
+
+DOWNSAMPLE_INTERVALS = (ONE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL,
+ ONE_HOUR_INTERVAL, FOUR_HOUR_INTERVAL,
+ ONE_DAY_INTERVAL, ONE_WEEK_INTERVAL,
+ FOUR_WEEK_INTERVAL)
+
+WINDOW_INTERVALS = (THIRTY_SECOND_INTERVAL, ONE_MINUTE_INTERVAL,
+ FIVE_MINUTE_INTERVAL, TEN_MINUTE_INTERVAL)
+
+VALUE_DATA_FORMAT = 'value'
+RATE_DATA_FORMAT = 'rate'
+
+
+class StatsException(Exception):
+ pass
+
+
+class StatsInvalidStatsDataException(StatsException):
+ def __init__(self):
+ super(StatsInvalidStatsDataException,self).__init__(
+ 'Error adding stats data with incorrect format')
+
+
+class StatsDatabaseConnectionException(StatsException):
+ def __init__(self):
+ super(StatsDatabaseConnectionException,self).__init__(
+ 'Error connecting to stats database')
+
+
+class StatsDatabaseAccessException(StatsException):
+ def __init__(self):
+ super(StatsDatabaseAccessException,self).__init__(
+ 'Error accessing stats database')
+
+class StatsNonnumericValueException(StatsException):
+ def __init__(self, value):
+ super(StatsNonnumericValueException,self).__init__(
+ 'Invalid non-numeric stat value for rate or '
+ 'average value computation: ' + str(value))
+
+
+class StatsRateComputationException(StatsException):
+ def __init__(self):
+ super(StatsRateComputationException,self).__init__(
+ 'Error computing rate; not enough raw data')
+
+
+class StatsInvalidDataFormatException(StatsException):
+ def __init__(self, data_format):
+ super(StatsInvalidDataFormatException,self).__init__(
+ 'Invalid data format: ' + str(data_format))
+
+
+class StatsInvalidStatsTimeRangeException(StatsException):
+ def __init__(self, start_time, end_time):
+ super(StatsInvalidStatsTimeRangeException,self).__init__(
+ 'Invalid stats time range; start = %s; end = %s' %
+ (str(start_time), str(end_time)))
+
+
+class StatsInvalidStatsTypeException(StatsException):
+ def __init__(self, stats_type):
+ super(StatsInvalidStatsTypeException,self).__init__(
+ 'Invalid stats type; name = %s' % str(stats_type))
+
+
+class StatsInvalidStatsMetadataException(StatsException):
+ def __init__(self, file_name):
+ super(StatsInvalidStatsMetadataException,self).__init__(
+ 'Invalid stats metadata from file \"%s\"' % str(file_name))
+
+
+class StatsInternalException(StatsException):
+ def __init__(self, message):
+ super(StatsInternalException,self).__init__(
+ 'Stats internal error: \"%s\"' % str(message))
+
+class StatsCreateColumnFamilyException(StatsException):
+ def __init__(self, name):
+ super(StatsCreateColumnFamilyException,self).__init__(
+ 'Error creating column family; name = ' % name)
+
+# The following code is a hack to get the stats code to use a freshly
+# created test keyspace when we're running the unit tests. I'm guessing
+# there must be some way to detect if we're running under the Django
+# (or Python) unit test framework, but I couldn't find any info about
+# how to do that. So for now, we just provide this function that the
+# unit tests must call before they make any stats call to get the stats
+# code to use the test keyspace instead of the normal production
+# keyspace. Bother.
+
+use_test_keyspace = False
+
+def set_use_test_keyspace():
+ global use_test_keyspace
+ use_test_keyspace = True
+
+
+stats_db_connection = None
+
+# FIXME: Should ideally create the column families on demand as the data
+# is added, so the different target types don't need to be predefined here.
+# Probably not a big problem for right now, though.
+COLUMN_FAMILY_INFO_LIST = (
+ {'name': TARGET_INDEX_NAME,
+ 'column_type': 'Super',
+ 'comparator_type': 'UTF8Type',
+ 'subcomparator_type': 'UTF8Type'},
+ {'name': STATS_TYPE_INDEX_NAME,
+ 'column_type': 'Super',
+ 'comparator_type': 'UTF8Type',
+ 'subcomparator_type': 'UTF8Type'},
+ {'name': CONTROLLER_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
+ 'comparator_type': 'UTF8Type'},
+ {'name': SWITCH_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
+ 'comparator_type': 'UTF8Type'},
+ {'name': PORT_STATS_NAME + STATS_COLUMN_FAMILY_NAME_SUFFIX,
+ 'comparator_type': 'UTF8Type'},
+ {'name': EVENTS_COLUMN_FAMILY_NAME,
+ 'column_type': 'Super',
+ 'comparator_type': 'UTF8Type',
+ 'subcomparator_type': 'UTF8Type'},
+)
+
+
+def init_stats_db_connection(host, port, keyspace, user, password,
+ replication_factor, column_family_def_default_settings):
+ global stats_db_connection
+ if not stats_db_connection:
+ if use_test_keyspace:
+ keyspace = "test_" + keyspace
+
+ try:
+ stats_db_connection = CassandraConnection(host, port, keyspace, user, password)
+ stats_db_connection.connect()
+ except Exception:
+ stats_db_connection = None
+ raise StatsException("Error connecting to Cassandra daemon")
+
+ if use_test_keyspace:
+ try:
+ stats_db_connection.get_client().system_drop_keyspace(keyspace)
+ except Exception:
+ pass
+
+ try:
+ stats_db_connection.set_keyspace()
+ create_keyspace = False
+ except Exception:
+ create_keyspace = True
+
+ if create_keyspace:
+ keyspace_def = KsDef(name=keyspace,
+ strategy_class='org.apache.cassandra.locator.SimpleStrategy',
+ replication_factor=replication_factor,
+ cf_defs=[])
+ try:
+ stats_db_connection.get_client().system_add_keyspace(keyspace_def)
+ stats_db_connection.set_keyspace()
+ except Exception, _e:
+ stats_db_connection = None
+ raise StatsException("Error creating stats keyspace")
+
+ for column_family_info in COLUMN_FAMILY_INFO_LIST:
+ try:
+ column_family_def_settings = column_family_def_default_settings.copy()
+ column_family_def_settings.update(column_family_info)
+ column_family_def_settings['keyspace'] = keyspace
+ # pylint: disable=W0142
+ stats_db_connection.get_client().system_add_column_family(
+ CfDef(**column_family_def_settings))
+ except InvalidRequestException, _e:
+ # Assume this is because the column family already exists.
+ # FIXME. Could check exception message for specific string
+ pass
+ except Exception, _e:
+ stats_db_connection = None
+ raise StatsCreateColumnFamilyException(column_family_info.get('name'))
+
+
+def get_stats_db_connection():
+ return stats_db_connection
+
+
+# The following function is mainly intended to be used by the unit tests. It lets
+# you clear out all of the data from the database. Note that since the stats DB
+# is not managed by the normal Django DB mechanism you don't get the automatic
+# DB flushing from the Django TestCase code, so it has to be done explicitly.
+# There's a StatsTestCase subclass of TestCase in the stats unit tests that
+# implements the tearDown method to call flush_stats_db after each test.
+def flush_stats_db():
+ if stats_db_connection is not None:
+ for column_family_info in COLUMN_FAMILY_INFO_LIST:
+ stats_db_connection.get_client().truncate(column_family_info['name'])
+
+def call_cassandra_with_reconnect(fn, *args, **kwargs):
+ try:
+ try:
+ results = fn(*args, **kwargs)
+ except TTransport.TTransportException:
+ stats_db_connection.reconnect()
+ results = fn(*args, **kwargs)
+ except TTransport.TTransportException, _e:
+ raise StatsDatabaseConnectionException()
+ except Exception, _e:
+ raise StatsDatabaseAccessException()
+
+ return results
+
+
+def get_stats_padded_column_part(column_part):
+ """
+ For the columns to be sorted correctly by time we need to pad with
+ leading zeroes up to the maximum range of the bucket
+ """
+ column_part = str(column_part)
+ leading_zeroes = ('0'*(STATS_PADDED_COLUMN_TIME_LENGTH-len(column_part)))
+ column_part = leading_zeroes + column_part
+ return column_part
+
+
+def split_stats_timestamp(timestamp):
+ key_part = timestamp / STATS_BUCKET_PERIOD
+ column_part = timestamp % STATS_BUCKET_PERIOD
+ return (key_part, column_part)
+
+
+def construct_stats_key(cluster, target_id, stats_type, timestamp_key_part):
+ """
+ Constructs the keys for the controller or switch stats.
+ For the controller stats the target_id is the controller node id.
+ For switch stats the target_id is the dpid of the switch.
+ """
+ return cluster + '|' + target_id + '|' + stats_type + '|' + str(timestamp_key_part)
+
+
+def append_stats_results(get_results, values, timestamp_key_part):
+ shifted_timestamp_key_part = int(timestamp_key_part) * STATS_BUCKET_PERIOD
+ for item in get_results:
+ timestamp_column_part = int(item.column.name)
+ value = item.column.value
+ timestamp = shifted_timestamp_key_part + timestamp_column_part
+ values.append((timestamp, value))
+
+
+def get_stats_slice_predicate(column_start, column_end):
+ if column_start != '':
+ column_start = get_stats_padded_column_part(column_start)
+ if column_end != '':
+ column_end = get_stats_padded_column_part(column_end)
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start=column_start, finish=column_end, count=1000000))
+ return slice_predicate
+
+
+def check_time_range(start_time, end_time):
+ if int(end_time) < int(start_time):
+ raise StatsInvalidStatsTimeRangeException(start_time, end_time)
+
+
+def check_valid_data_format(data_format):
+ if data_format != VALUE_DATA_FORMAT and data_format != RATE_DATA_FORMAT:
+ raise StatsInvalidDataFormatException(data_format)
+
+
+def get_window_range(raw_stats_values, index, window):
+
+ if window == 0:
+ return (index, index)
+
+ # Get start index
+ timestamp = raw_stats_values[index][0]
+ start_timestamp = timestamp - (window / 2)
+ end_timestamp = timestamp + (window / 2)
+
+ start_index = index
+ while start_index > 0:
+ next_timestamp = raw_stats_values[start_index - 1][0]
+ if next_timestamp < start_timestamp:
+ break
+ start_index -= 1
+
+ end_index = index
+ while end_index < len(raw_stats_values) - 1:
+ next_timestamp = raw_stats_values[end_index + 1][0]
+ if next_timestamp > end_timestamp:
+ break
+ end_index += 1
+
+ return (start_index, end_index)
+
+
+def convert_stat_string_to_value(stat_string):
+ try:
+ stat_value = int(stat_string)
+ except ValueError:
+ try:
+ stat_value = float(stat_string)
+ except ValueError:
+ stat_value = stat_string
+ return stat_value
+
+
+def get_rate_over_stats_values(stats_values):
+
+ if len(stats_values) < 2:
+ return None
+
+ start_stat = stats_values[0]
+ end_stat = stats_values[-1]
+
+ timestamp_delta = end_stat[0] - start_stat[0]
+ # NOTE: In computing the value_delta here it's safe to assume floats
+ # rather than calling convert_stat_string_to_value because we're going
+ # to be converting to float anyway when we do the rate calculation later.
+ # So there's no point in trying to differentiate between int and float
+ # and rate doesn't make sense for any other type of stat data (e.g. string)
+ value_delta = float(end_stat[1]) - float(start_stat[1])
+ if timestamp_delta == 0:
+ rate = float('inf' if value_delta > 0 else '-inf')
+ else:
+ rate = value_delta / timestamp_delta
+
+ return rate
+
+
+def get_rate_over_window(raw_stats_values, index, window):
+ if len(raw_stats_values) < 2:
+ return None
+
+ if window == 0:
+ if index == 0:
+ start_index = 0
+ end_index = 1
+ else:
+ start_index = index - 1
+ end_index = index
+ else:
+ start_index, end_index = get_window_range(raw_stats_values, index, window)
+
+ return get_rate_over_stats_values(raw_stats_values[start_index:end_index + 1])
+
+
+def get_average_over_stats_values(stats_values):
+
+ total = 0
+ count = 0
+ for stat_value in stats_values:
+ # FIXME: Should we just always convert to float here?
+ # This would give a more accurate result for the average calculation
+ # but would mean that the data type is different for a
+ # zero vs. non-zero window size.
+ value = convert_stat_string_to_value(stat_value[1])
+ if type(value) not in (int,float):
+ raise StatsNonnumericValueException(value)
+ total += value
+ count += 1
+
+ return (total / count) if count > 0 else None
+
+
+def get_average_value_over_window(raw_stats_values, index, window):
+ start_index, end_index = get_window_range(raw_stats_values, index, window)
+ stats_values = raw_stats_values[start_index:end_index + 1]
+ return get_average_over_stats_values(stats_values)
+
+
+def reverse_stats_data_generator(cluster, target_type, target_id, stats_type,
+ start_time=None, end_time=None,
+ chunk_interval=3600000):
+ if start_time is None:
+ start_time = int(time.time() * 1000)
+ if end_time is None:
+ # By default, don't go back past 1/1/2011. This was before we had stats support
+ # in the controller, so we shouldn't have any data earlier than that (except if
+ # the clock on the controller was set incorrectly).
+ end_time = int(time.mktime(datetime.datetime(2011,1,1).timetuple()) * 1000)
+ end_key_part, _end_column_part = split_stats_timestamp(end_time)
+ key_part, column_part = split_stats_timestamp(start_time)
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ column_parent = ColumnParent(column_family)
+ # FIXME: Should add support for chunk_interval to be either iterable or a
+ # list/tuple to give a sequence of chunk intervals to use. The last available
+ # chunk interval from the list/tuple/iterator would then be used for any
+ # subsequent cassandra calls
+ #chunk_interval_iter = (chunk_interval if isinstance(chunk_interval, list) or
+ # isinstance(chunk_interval, tuple) else [chunk_interval])
+ while key_part >= 0:
+ key = construct_stats_key(cluster, target_id, stats_type, key_part)
+
+ while True:
+ column_start = column_part - chunk_interval
+ if column_start < 0:
+ column_start = 0
+ slice_predicate = get_stats_slice_predicate(column_start, column_part)
+ for attempt in (1,2):
+ try:
+ get_results = stats_db_connection.get_client().get_slice(key,
+ column_parent, slice_predicate, ConsistencyLevel.ONE)
+ for item in reversed(get_results):
+ timestamp = (key_part * STATS_BUCKET_PERIOD) + int(item.column.name)
+ value = item.column.value
+ yield (timestamp, value)
+ break
+ except TTransport.TTransportException:
+ # Only retry once, so if it's the second time through,
+ # propagate the exception
+ if attempt == 2:
+ raise StatsDatabaseConnectionException()
+ stats_db_connection.reconnect()
+ except Exception:
+ raise StatsDatabaseAccessException()
+
+ column_part = column_start
+ if column_part == 0:
+ break
+
+ if key_part == end_key_part:
+ break
+ key_part -= 1
+ column_part = STATS_BUCKET_PERIOD - 1
+
+
+def get_latest_stat_data(cluster, target_type, target_id, stats_type,
+ window=0, data_format=VALUE_DATA_FORMAT):
+
+ check_valid_data_format(data_format)
+
+ minimum_data_points = 2 if data_format == RATE_DATA_FORMAT else 1
+ stats_data_window = []
+ latest_stat_timestamp = None
+
+ start_time = int(time.time() * 1000)
+ # Limit how far back we'll look for the latest stat value.
+ # 86400000 is 1 day in ms
+ end_time = start_time - 86400000
+ for stat_data_point in reverse_stats_data_generator(cluster,
+ target_type, target_id, stats_type, start_time, end_time):
+ current_stat_timestamp = stat_data_point[0]
+ if latest_stat_timestamp is None:
+ latest_stat_timestamp = current_stat_timestamp
+
+ # NOTE: For stats operations we treat the window for the rate or
+ # average calculation to be centered around the current timestamp.
+ # For the latest stat case there is no data after the current point.
+ # We could extend the window back further so that the current timestamp
+ # is the end of the window range instead of the middle, but then that
+ # would be inconsistent with the other cases, so instead we just go
+ # back to half the window size. I haven't been able to convince myself
+ # strongly one way or the other which is better (or how much it matters)
+ outside_window = (latest_stat_timestamp - current_stat_timestamp) > (window / 2)
+ if len(stats_data_window) >= minimum_data_points and outside_window:
+ break
+
+ stats_data_window.insert(0, stat_data_point)
+
+ if (window == 0) and (len(stats_data_window) >= minimum_data_points):
+ break
+
+ stat_data_point = None
+
+ if latest_stat_timestamp is not None:
+ if data_format == VALUE_DATA_FORMAT:
+ value = get_average_over_stats_values(stats_data_window)
+ else:
+ assert data_format == RATE_DATA_FORMAT, "Invalid data format"
+ value = get_rate_over_stats_values(stats_data_window)
+ if value is not None:
+ stat_data_point = (latest_stat_timestamp, value)
+
+ return stat_data_point
+
+
+def get_stats_data(cluster, target_type, target_id, stats_type,
+ start_time, end_time, sample_interval=0, window=0,
+ data_format=VALUE_DATA_FORMAT, limit=None):
+
+ check_time_range(start_time, end_time)
+ check_valid_data_format(data_format)
+ # FIXME: Add validation of other arguments
+
+ start_key_part, start_column_part = split_stats_timestamp(int(start_time))
+ end_key_part, end_column_part = split_stats_timestamp(int(end_time))
+
+ raw_stats_values = []
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ column_parent = ColumnParent(column_family)
+
+ for key_part in range(start_key_part, end_key_part+1):
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ # FIXME: How big can the count be?
+ slice_predicate = get_stats_slice_predicate(current_start, current_end)
+ key = construct_stats_key(cluster, target_id, stats_type, key_part)
+ for attempt in (1,2):
+ try:
+ get_results = stats_db_connection.get_client().get_slice(key,
+ column_parent, slice_predicate, ConsistencyLevel.ONE)
+ break
+ except TTransport.TTransportException:
+ # Only retry once, so if it's the second time through,
+ # propagate the exception
+ if attempt == 2:
+ raise StatsDatabaseConnectionException()
+ stats_db_connection.reconnect()
+ except Exception:
+ raise StatsDatabaseAccessException()
+
+ append_stats_results(get_results, raw_stats_values, key_part)
+
+ # FIXME: This logic to handle the limit argument isn't complete.
+ # It doesn't account for a non-zero window or dpwnsampling.
+ if (limit != None and sample_interval == 0 and window == 0 and
+ len(raw_stats_values) > limit):
+ raw_stats_values = raw_stats_values[:limit]
+ break
+
+ stats_values = []
+ last_downsample_index = None
+ for i in range(0, len(raw_stats_values)):
+ # Handle downsampling
+ if sample_interval != 0:
+ downsample_index = raw_stats_values[i][0] / sample_interval
+ if downsample_index == last_downsample_index:
+ continue
+ last_downsample_index = downsample_index
+
+ # Get the value based for the specified data format
+ if data_format == VALUE_DATA_FORMAT:
+ if sample_interval == 0:
+ value = convert_stat_string_to_value(raw_stats_values[i][1])
+ else:
+ value = get_average_value_over_window(raw_stats_values, i, window)
+ else:
+ assert data_format == RATE_DATA_FORMAT, "Invalid data format"
+ value = get_rate_over_window(raw_stats_values, i, window)
+
+ if value is not None:
+ stats_values.append((raw_stats_values[i][0], value))
+
+ return stats_values
+
+
+def delete_stats_data(cluster, target_type, target_id, stats_type,
+ start_time, end_time):
+
+ check_time_range(start_time, end_time)
+ # FIXME: Add validation of other arguments
+
+ start_key_part, start_column_part = split_stats_timestamp(int(start_time))
+ end_key_part, end_column_part = split_stats_timestamp(int(end_time))
+
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ column_parent = ColumnParent(column_family)
+ # The Cassandra timestamps are in microseconds, not milliseconds,
+ # so we convert to microseconds. The Cassandra timestamp is derived
+ # from the stat timestamp (i.e. same time converted to microseconds),
+ # so we use the end_time + 1, since that's guaranteed to be greater
+ # than any of the timestamps for the sample points we're deleting.
+ timestamp = (int(end_time) * 1000) + 1
+ for key_part in range(start_key_part, end_key_part+1):
+ key = construct_stats_key(cluster, target_id, stats_type, key_part)
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ if current_start == '' and current_end == '':
+ call_cassandra_with_reconnect(stats_db_connection.get_client().remove,
+ key, ColumnPath(column_family=column_family), timestamp,
+ ConsistencyLevel.ONE)
+ else:
+ # grrr. Cassandra currently doesn't support doing deletions via a
+ # slice range (i.e. a column start and end). You need to give it a
+ # list of columns. So we do a get_slice with the slice range and then
+ # extract the individual column names from the result of that and
+ # build up the column list that we can use to delete the column
+ # using batch_mutate.
+ slice_predicate = get_stats_slice_predicate(current_start, current_end)
+ get_results = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice,
+ key, column_parent, slice_predicate, ConsistencyLevel.ONE)
+ column_names = []
+ for item in get_results:
+ column_names.append(item.column.name)
+
+ deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names))
+ mutation_map = {key: {column_family: [Mutation(deletion=deletion)]}}
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+
+STATS_METADATA_VARIABLE_NAME = 'STATS_METADATA'
+
+stats_metadata = None
+
+def init_stats_metadata(_cluster):
+ """
+ Initialize the dictionary of stats metadata. Currently this is initialized
+ from a directory of metadata files that contain the metadata. Currently,
+ there is no differentiation in the stats types that are supported across
+ clusters, so we ignore the cluster argument and we just maintain a
+ global map of stat type metadata.
+ """
+ global stats_metadata
+ if not stats_metadata:
+ stats_metadata = {}
+ for module_name in settings.STATS_METADATA_MODULES:
+ metadata_module = __import__(module_name,
+ fromlist=[STATS_METADATA_VARIABLE_NAME])
+ if not metadata_module:
+ # FIXME: log error
+ continue
+
+ if STATS_METADATA_VARIABLE_NAME not in dir(metadata_module):
+ # FIXME: log error
+ continue
+
+ metadata_list = getattr(metadata_module, STATS_METADATA_VARIABLE_NAME)
+
+ if type(metadata_list) is dict:
+ metadata_list = [metadata_list]
+
+ if type(metadata_list) is not list and type(metadata_list) is not tuple:
+ raise StatsInvalidStatsMetadataException(module_name)
+
+ for metadata in metadata_list:
+ if type(metadata) is not dict:
+ raise StatsInvalidStatsMetadataException(module_name)
+
+ name = metadata.get('name')
+ if not name:
+ raise StatsInvalidStatsMetadataException(module_name)
+ name = str(name)
+
+ # Auto-set the verbose_name to the name if it's not set explicitly
+ verbose_name = metadata.get('verbose_name')
+ if not verbose_name:
+ metadata['verbose_name'] = name
+
+ # FIXME: Validate other contents of metadata.
+ # e.g. flag name conflicts between files.
+
+ stats_metadata[name] = metadata
+
+def get_stats_metadata(cluster, stats_type=None):
+ init_stats_metadata(cluster)
+ # If no stat_type is specified return the entire dictionary of stat types
+ metadata = stats_metadata.get(stats_type) if stats_type else stats_metadata
+ if metadata is None:
+ raise StatsInvalidStatsTypeException(stats_type)
+ return metadata
+
+
+STATS_INDEX_ATTRIBUTE_TYPES = {
+ 'last-updated': int
+}
+def stats_type_slice_to_index_data(stats_type_slice):
+ index_data = {}
+ for super_column in stats_type_slice:
+ name = super_column.super_column.name
+ column_list = super_column.super_column.columns
+ if name == 'base':
+ insert_map = index_data
+ elif name.startswith('param:'):
+ colon_index = name.find(':')
+ parameter_name = name[colon_index+1:]
+ parameter_map = index_data.get('parameters')
+ if not parameter_map:
+ parameter_map = {}
+ index_data['parameters'] = parameter_map
+ insert_map = {}
+ parameter_map[parameter_name] = insert_map
+ else:
+ raise StatsInternalException('Invalid stats type index name: ' + str(name))
+
+ for column in column_list:
+ value = column.value
+ attribute_type = STATS_INDEX_ATTRIBUTE_TYPES.get(column.name)
+ if attribute_type is not None:
+ value = attribute_type(value)
+ insert_map[column.name] = value
+
+ return index_data
+
+
+def get_stats_type_index(cluster, target_type, target_id, stats_type=None):
+ column_parent = ColumnParent(STATS_TYPE_INDEX_NAME)
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start='', finish='', count=1000000))
+ key_prefix = cluster + ':' + target_type + ':' + target_id
+ if stats_type is None:
+ key_range = KeyRange(start_key=key_prefix+':', end_key=key_prefix+';', count=100000)
+ key_slice_list = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_range_slices,
+ column_parent, slice_predicate, key_range, ConsistencyLevel.ONE)
+ stats_index_data = {}
+ for key_slice in key_slice_list:
+ key = key_slice.key
+ colon_index = key.rfind(':')
+ if colon_index < 0:
+ raise StatsInternalException('Invalid stats type index key: ' + str(key))
+ stats_type = key[colon_index+1:]
+ stats_index_data[stats_type] = stats_type_slice_to_index_data(key_slice.columns)
+ else:
+ key = key_prefix + ':' + stats_type
+ stats_type_slice = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice, key, column_parent,
+ slice_predicate, ConsistencyLevel.ONE)
+ stats_index_data = stats_type_slice_to_index_data(stats_type_slice)
+
+ return stats_index_data
+
+
+def get_stats_target_types(cluster):
+ column_parent = ColumnParent(TARGET_INDEX_NAME)
+ slice_predicate = SlicePredicate(column_names=[])
+ key_range = KeyRange(start_key=cluster+':', end_key=cluster+';', count=100000)
+ key_slice_list = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_range_slices,
+ column_parent, slice_predicate, key_range, ConsistencyLevel.ONE)
+
+ target_types = {}
+ for key_slice in key_slice_list:
+ target_type = key_slice.key[len(cluster)+1:]
+
+ target_types[target_type] = {}
+
+ return target_types
+
+
+STATS_TARGET_ATTRIBUTE_TYPES = {
+ 'last-updated': int
+}
+
+def get_stats_targets(cluster, target_type):
+ key = cluster + ':' + target_type
+ column_parent = ColumnParent(TARGET_INDEX_NAME)
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start='', finish='', count=1000000))
+ super_column_list = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice, key, column_parent,
+ slice_predicate, ConsistencyLevel.ONE)
+ target_list = {}
+ for item in super_column_list:
+ target = {}
+ for column in item.super_column.columns:
+ value = column.value
+ attribute_type = STATS_TARGET_ATTRIBUTE_TYPES.get(column.name)
+ if attribute_type is not None:
+ value = attribute_type(value)
+ target[column.name] = value
+ target_list[item.super_column.name] = target
+
+ return target_list
+
+
+# FIXME: Should update the code below to use these constants
+# instead of string literals
+LAST_UPDATED_ATTRIBUTE_NAME = 'last-updated'
+CONTROLLER_ATTRIBUTE_NAME = 'controller'
+BASE_SUPER_COLUMN_NAME = 'base'
+PARAMETERS_SUPER_COLUMN_NAME = 'parameters'
+PARAM_SUPER_COLUMN_NAME_PREFIX = 'param:'
+
+def append_attributes_to_mutation_list(attributes, supercolumn_name, mutation_list):
+ column_list = []
+ for name, info in attributes.iteritems():
+ timestamp, value = info
+ column = Column(name=name, value=str(value), timestamp=timestamp*1000)
+ column_list.append(column)
+ mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
+ super_column=SuperColumn(name=supercolumn_name, columns=column_list)))
+ mutation_list.append(mutation)
+
+
+def add_stat_type_index_info_to_mutation_map(cluster, target_type,
+ stats_type_index, mutation_map):
+ for key, stats_type_info in stats_type_index.iteritems():
+ separator_index = key.find(':')
+ assert separator_index >= 0
+ base_stat_type_name = key[:separator_index]
+ target_id = key[separator_index + 1:]
+ stats_type_base_attributes, stats_type_params = stats_type_info
+ mutation_list = []
+ append_attributes_to_mutation_list(stats_type_base_attributes,
+ 'base', mutation_list)
+ for name, attributes in stats_type_params.iteritems():
+ append_attributes_to_mutation_list(attributes,
+ 'param:' + name, mutation_list)
+ mutation_key = cluster + ':' + target_type + ':' + target_id + ':' + base_stat_type_name
+ mutation_map[mutation_key] = {STATS_TYPE_INDEX_NAME: mutation_list}
+
+
+def add_target_id_list_to_mutation_map(cluster, target_type,
+ target_id_list, mutation_map):
+ mutation_list = []
+ for target_id, attributes in target_id_list:
+ append_attributes_to_mutation_list(attributes, target_id, mutation_list)
+ key = cluster + ':' + target_type
+ mutation_map[key] = {TARGET_INDEX_NAME: mutation_list}
+
+
+def _put_stats_data(cluster, target_type, stats_data):
+ try:
+ controller_id = get_local_controller_id()
+ mutation_map = {}
+ target_id_list = []
+ stats_type_index = {}
+ column_family = target_type + STATS_COLUMN_FAMILY_NAME_SUFFIX
+ for (target_id, target_id_stats) in stats_data.iteritems():
+ # Map 'localhost' controller to the actual ID for the local controller
+ # FIXME: Eventually we should fix up the other components (e.g. statd)
+ # that invoke this REST API to not use localhost and instead use the
+ # REST API to obtain the real ID for the local controller, but for now
+ # this works to ensure we're not using localhost in any of the data we
+ # store in the DB (unless, of course, the uuid version of the controller
+ # ID hasn't been written to the boot-config file, in which case it will
+ # default to the old localhost value).
+ if target_type == 'controller' and target_id == 'localhost':
+ target_id = controller_id
+ latest_id_timestamp = None
+ for (stats_type, stats_data_array) in target_id_stats.iteritems():
+ # Check if it's a parameterized type and extract the base
+ # stat type and parameter name.
+ parameter_separator = stats_type.find('__')
+ if parameter_separator >= 0:
+ stats_type_base = stats_type[:parameter_separator]
+ stats_type_parameter = stats_type[parameter_separator+2:]
+ else:
+ stats_type_base = stats_type
+ stats_type_parameter = None
+
+ latest_stat_type_timestamp = None
+
+ # Add the stats values to the mutation map
+ for stats_value in stats_data_array:
+ timestamp = int(stats_value['timestamp'])
+ if latest_stat_type_timestamp is None or timestamp > latest_stat_type_timestamp:
+ latest_stat_type_timestamp = timestamp
+ if latest_id_timestamp is None or timestamp > latest_id_timestamp:
+ latest_id_timestamp = timestamp
+ value = stats_value['value']
+ timestamp_key_part, timestamp_column_part = split_stats_timestamp(timestamp)
+ key = construct_stats_key(cluster, target_id, stats_type, timestamp_key_part)
+ key_entry = mutation_map.get(key)
+ if not key_entry:
+ mutation_list = []
+ mutation_map[key] = {column_family: mutation_list}
+ else:
+ mutation_list = key_entry[column_family]
+
+ # Note: convert the Cassandra timestamp value to microseconds to
+ # be consistent with standard Cassandra timestamp format.
+ mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
+ column=Column(name=get_stats_padded_column_part(timestamp_column_part),
+ value=str(value), timestamp=timestamp*1000)))
+ mutation_list.append(mutation)
+
+ # Update the stat type index info.
+ # There can be multiple parameterized types for each base stats type,
+ # so we need to be careful about checking for existing data for
+ # the index_entry. Because of the dictionary nature of the put data
+ # and the way this is serialized into a Python dictionary, though,
+ # we are guaranteed that there won't be multiple entries for a
+ # specific parameters stats type or the base stats type, so we don't
+ # need to handle duplicates for those.
+ if latest_stat_type_timestamp is not None:
+ stats_type_index_key = stats_type_base + ':' + target_id
+ stats_type_info = stats_type_index.get(stats_type_index_key)
+ if not stats_type_info:
+ # This is a tuple of two dictionaries: the attributes for
+ # the base stat type and a dictionary of the parameterized
+ # types that have been seen for that stat type. The
+ # parameterized type dictionary is keyed by the name of
+ # the parameterized type and the value is the associated
+ # attribute dictionary.
+ stats_type_info = ({},{})
+ stats_type_index[stats_type_index_key] = stats_type_info
+ stats_type_base_attributes, stats_type_params = stats_type_info
+ if stats_type_parameter is None:
+ attributes = stats_type_base_attributes
+ else:
+ attributes = stats_type_params.get(stats_type_parameter)
+ if attributes is None:
+ attributes = {}
+ stats_type_params[stats_type_parameter] = attributes
+ last_updated_entry = attributes.get('last-updated')
+ if last_updated_entry is None or latest_stat_type_timestamp > last_updated_entry[0]:
+ attributes['last-updated'] = (latest_stat_type_timestamp, latest_stat_type_timestamp)
+
+ # Update the target index
+ if latest_id_timestamp is not None:
+ # FIXME: Always set the controller attributes for now.
+ # This could/should be optimized to not set this for stats
+ # whose target type is 'controller' since those will
+ # always be coming from the same controller (i.e. itself).
+ # But that change requires some changes (albeit minor) to
+ # syncd to work correctly which I don't want to mess with
+ # right now.
+ #attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp)}
+ #if target_type != 'controller':
+ # attributes['controller'] = controller_id
+ attributes = {'last-updated': (latest_id_timestamp, latest_id_timestamp),
+ 'controller': (latest_id_timestamp, controller_id)}
+ target_id_list.append((target_id, attributes))
+ except Exception, _e:
+ raise StatsInvalidStatsDataException()
+
+ add_stat_type_index_info_to_mutation_map(cluster, target_type, stats_type_index, mutation_map)
+ add_target_id_list_to_mutation_map(cluster, target_type, target_id_list, mutation_map)
+
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+
+def put_stats_data(cluster, stats_data):
+ for target_type, target_stats_data in stats_data.items():
+ if target_type.endswith(STATS_TARGET_TYPE_PUT_DATA_SUFFIX):
+ # Strip off the '-stats' suffix
+ target_type = target_type[:-len(STATS_TARGET_TYPE_PUT_DATA_SUFFIX)]
+ _put_stats_data(cluster, target_type, target_stats_data)
+
+
+def get_events_padded_column_part(column_part):
+ """
+ For the columns to be sorted correctly by time we need to pad with
+ leading zeroes up to the maximum range of the bucket
+ """
+ column_part = str(column_part)
+ leading_zeroes = ('0'*(EVENTS_PADDED_COLUMN_TIME_LENGTH-len(column_part)))
+ column_part = leading_zeroes + column_part
+ return column_part
+
+
+def split_events_timestamp(timestamp):
+ key_part = timestamp / EVENTS_BUCKET_PERIOD
+ column_part = timestamp % EVENTS_BUCKET_PERIOD
+ return (key_part, column_part)
+
+
+def construct_log_events_key(cluster, node_id, timestamp_key_part):
+ return cluster + '|' + node_id + '|' + str(timestamp_key_part)
+
+
+def get_events_slice_predicate(column_start, column_end):
+ if column_start != '':
+ column_start = get_events_padded_column_part(column_start)
+ if column_end != '':
+ # For the final key in the range of keys we want all of the
+ # supercolumns whose name starts with end_column_part.
+ # If the event has includes a pk tag then the format of the
+ # supercolumn name is <timestamp-part>:<pk-tag>.
+ # Otherwise it's just the <timestamp-part>. To get both of these
+ # cases we set the column end value to be the <timestamp-part>
+ # suffixed with ';' (which has an ordinal value 1 greater than
+ # ':'. So this will get all of the events with the given column
+ # end regardless of whether or not they include the pk tag.
+ column_end = get_events_padded_column_part(column_end) + ';'
+ slice_predicate = SlicePredicate(slice_range=SliceRange(
+ start=column_start, finish=column_end, count=1000000))
+ return slice_predicate
+
+
+def append_log_events_results(event_results, event_list, timestamp_key_part, include_pk_tag=False):
+ shifted_timestamp_key_part = int(timestamp_key_part) * EVENTS_BUCKET_PERIOD
+ for item in event_results:
+ event = {}
+ super_column_name = item.super_column.name
+ colon_index = super_column_name.find(":")
+ if colon_index >= 0:
+ if include_pk_tag:
+ pk_tag = super_column_name[colon_index:]
+ event['pk-tag'] = pk_tag
+ timestamp_column_part = super_column_name[:colon_index]
+ else:
+ timestamp_column_part = super_column_name
+ timestamp = shifted_timestamp_key_part + int(timestamp_column_part)
+ event['timestamp'] = timestamp
+ for column in item.super_column.columns:
+ event[column.name] = column.value
+ event_list.append(event)
+
+
+def get_log_event_data(cluster, node_id, start_time, end_time, include_pk_tag=False):
+ # FIXME: Add some validation of arguments
+ start_key_part, start_column_part = split_events_timestamp(int(start_time))
+ end_key_part, end_column_part = split_events_timestamp(int(end_time))
+
+ event_list = []
+ column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME)
+ for key_part in range(start_key_part, end_key_part+1):
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ # FIXME: How big can the count be?
+ slice_predicate = get_events_slice_predicate(current_start, current_end)
+ key = construct_log_events_key(cluster, node_id, key_part)
+ for attempt in (1,2):
+ try:
+ results = stats_db_connection.get_client().get_slice(key,
+ column_parent, slice_predicate, ConsistencyLevel.ONE)
+ break
+ except TTransport.TTransportException:
+ # Only retry once, so if it's the second time through,
+ # propagate the exception
+ if attempt == 2:
+ raise StatsDatabaseConnectionException()
+ stats_db_connection.reconnect()
+ except Exception:
+ raise StatsDatabaseAccessException()
+ append_log_events_results(results, event_list, key_part, include_pk_tag)
+
+ return event_list
+
+
+def put_log_event_data(cluster, log_events_data):
+ try:
+ mutation_map = {}
+ for (node_id, node_events) in log_events_data.iteritems():
+ for event in node_events:
+ timestamp = event['timestamp']
+ pk_tag = event.get('pk-tag')
+ # If the entry in the put data does not specify a tag, then generate a random one.
+ # This is so that we can have multiple events with the same timestamp.
+ # FIXME: Is there something better we can do here?
+ if not pk_tag:
+ pk_tag = random.randint(0,10000000000)
+ timestamp = int(timestamp)
+ timestamp_key_part, timestamp_column_part = split_events_timestamp(timestamp)
+ key = construct_log_events_key(cluster, node_id, timestamp_key_part)
+ key_entry = mutation_map.get(key)
+ if not key_entry:
+ mutation_list = []
+ mutation_map[key] = {EVENTS_COLUMN_FAMILY_NAME: mutation_list}
+ else:
+ mutation_list = key_entry[EVENTS_COLUMN_FAMILY_NAME]
+ supercolumn_name = get_events_padded_column_part(timestamp_column_part)
+ if pk_tag is not None:
+ supercolumn_name += (':' + str(pk_tag))
+ # Build the list of columns in the supercolumn
+ column_list = []
+ for (name, value) in event.iteritems():
+ if name != 'timestamp':
+ column_list.append(Column(name=name, value=str(value),
+ timestamp=timestamp*1000))
+ mutation = Mutation(column_or_supercolumn=ColumnOrSuperColumn(
+ super_column=SuperColumn(name=supercolumn_name,
+ columns=column_list)))
+ mutation_list.append(mutation)
+ except Exception:
+ raise StatsInvalidStatsDataException()
+
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+def delete_log_event_data(cluster, node_id, start_time, end_time):
+ start_key_part, start_column_part = split_events_timestamp(int(start_time))
+ end_key_part, end_column_part = split_events_timestamp(int(end_time))
+ # The Cassandra timestamps are in microseconds, not milliseconds,
+ # so we convert to microseconds. The Cassandra timestamp is derived
+ # from the event timestamp (i.e. same time converted to microseconds),
+ # so we use the end_time + 1, since that's guaranteed to be greater
+ # than any of the timestamps for the sample points we're deleting.
+ timestamp = (int(end_time) * 1000) + 1
+ column_path = ColumnPath(column_family=EVENTS_COLUMN_FAMILY_NAME)
+ column_parent = ColumnParent(column_family=EVENTS_COLUMN_FAMILY_NAME)
+ for key_part in range(start_key_part, end_key_part+1):
+ key = construct_log_events_key(cluster, node_id, key_part)
+ current_start = start_column_part if key_part == start_key_part else ''
+ current_end = end_column_part if key_part == end_key_part else ''
+ if current_start == '' and current_end == '':
+ call_cassandra_with_reconnect(stats_db_connection.get_client().remove,
+ key, column_path, timestamp, ConsistencyLevel.ONE)
+ else:
+ # grrr. Cassandra currently doesn't support doing deletions via a
+ # slice range (i.e. a column start and end). You need to give it a
+ # list of columns. So we do a get_slice with the slice range and then
+ # extract the individual column names from the result of that and
+ # build up the column list that we can use to delete the column
+ # using batch_mutate.
+ slice_predicate = get_events_slice_predicate(current_start, current_end)
+ get_results = call_cassandra_with_reconnect(
+ stats_db_connection.get_client().get_slice,
+ key, column_parent, slice_predicate, ConsistencyLevel.ONE)
+ column_names = []
+ for item in get_results:
+ column_names.append(item.super_column.name)
+
+ deletion = Deletion(timestamp=timestamp, predicate=SlicePredicate(column_names=column_names))
+ mutation_map = {key: {EVENTS_COLUMN_FAMILY_NAME: [Mutation(deletion=deletion)]}}
+ call_cassandra_with_reconnect(stats_db_connection.get_client().batch_mutate,
+ mutation_map, ConsistencyLevel.ONE)
+
+
+def get_closest_sample_interval(requested_sample_interval):
+ for i in range(0, len(DOWNSAMPLE_INTERVALS)):
+ if DOWNSAMPLE_INTERVALS[i] > requested_sample_interval:
+ if i == 0:
+ return requested_sample_interval
+ downsample_interval = DOWNSAMPLE_INTERVALS[i - 1]
+ break
+ else:
+ downsample_interval = DOWNSAMPLE_INTERVALS[-1]
+ # Return the closest multiple of the downsampled interval
+ return downsample_interval * (requested_sample_interval // downsample_interval)
+
+
+def get_closest_window_interval(requested_window):
+ for i in range(0, len(WINDOW_INTERVALS)):
+ if WINDOW_INTERVALS[i] > requested_window:
+ return WINDOW_INTERVALS[i - 1] if i > 0 else 0
+ return WINDOW_INTERVALS[-1]
diff --git a/cli/sdncon/stats/models.py b/cli/sdncon/stats/models.py
new file mode 100755
index 0000000..7b5c2b6
--- /dev/null
+++ b/cli/sdncon/stats/models.py
@@ -0,0 +1,39 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from django.db import models
+
+class StatdConfig(models.Model):
+ stat_type = models.CharField(
+ primary_key=True,
+ verbose_name='Stat Type',
+ max_length=64)
+
+ sampling_period = models.PositiveIntegerField(
+ verbose_name='Sampling Period',
+ default=15)
+
+ reporting_period = models.PositiveIntegerField(
+ verbose_name='Reporting Period',
+ default=60)
+
+ class Rest:
+ NAME = 'statd-config'
+ FIELD_INFO = (
+ {'name': 'stat_type', 'rest_name': 'stat-type'},
+ {'name': 'sampling_period', 'rest_name': 'sampling-period'},
+ {'name': 'reporting_period', 'rest_name': 'reporting-period'},
+ )
diff --git a/cli/sdncon/stats/statsFiller.py b/cli/sdncon/stats/statsFiller.py
new file mode 100755
index 0000000..89f202f
--- /dev/null
+++ b/cli/sdncon/stats/statsFiller.py
@@ -0,0 +1,484 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+import json
+import logging
+import sys
+import time
+import random
+import urllib2
+import re
+import urllib
+from optparse import OptionParser, OptionGroup
+
+DEFAULT_CONTROLLER_ID = 'localhost'
+
+DEFAULT_BATCH = 5
+DEFAULT_SAMPLING_INTERVAL = 5
+
+STATS_CPU = 'cpu'
+STATS_MEM = 'mem'
+STATS_SWAP = 'swap'
+STATS_OF = 'of'
+STATS_LOG = 'log'
+
+STATS = [STATS_CPU, STATS_MEM, STATS_SWAP, STATS_OF, STATS_LOG]
+
+BASE_TIME = 1297278987000
+SECONDS_CONVERT = 1000
+MINUTES_CONVERT = 60 * SECONDS_CONVERT
+HOURS_CONVERT = 60 * MINUTES_CONVERT
+DAYS_CONVERT = 24 * HOURS_CONVERT
+
+numSamples = 0
+samplingInterval = 0
+batchSize = 0
+statsTypes = []
+numSwitches = 0
+switchIDs = []
+seed = 0
+controller = ''
+logger = 0
+debugLevel = logging.INFO
+logfile = 'filler.log'
+
+def initLogger():
+ # create a statFiller logger
+ logger = logging.getLogger("filler")
+ logger.setLevel(debugLevel)
+
+ formatter = logging.Formatter("%(asctime)s [%(name)s] %(levelname)s %(message)s")
+ # Add a file handler
+ rf_handler = logging.FileHandler(logfile)
+ rf_handler.setFormatter(formatter)
+ logger.addHandler(rf_handler)
+
+ # Add a console handler
+ co_handler = logging.StreamHandler()
+ co_handler.setFormatter(formatter)
+ co_handler.setLevel(logging.WARNING)
+ logger.addHandler(co_handler)
+
+ return logger
+
+def intToDpid(intValue):
+ import string
+ ret = []
+ if intValue > (2**128-1):
+ intValue = 2**128-1
+ for i in range(16):
+ mask = 0x0f<<(i*4)
+ tmp = (intValue&mask)>>(i*4)
+ ret.insert(0,string.hexdigits[tmp])
+ if i != 15 and i%2 == 1:
+ ret.insert(0, ':')
+
+ return ''.join(ret)
+
+def getRandomInc():
+ """
+ Randomly create an integer from 1 to 100
+ """
+ return random.randint(1,100)
+
+def getRandomInt(max):
+ """
+ Randomly create an integer from 1 to max
+ """
+ if max <= 1:
+ return 1
+ else:
+ return random.randint(1,max)
+
+def getRandomPercentage(max):
+ """
+ Randomly create a two-decimal percentage
+ """
+ percentMax = int(max * 100)
+ if percentMax < 2:
+ return 0.01
+ else:
+ try:
+ v = float(random.randint(1, percentMax))
+ return v/100
+ except ValueError as e:
+ logger.error ("error: %s, percentMax=%d"%(e, percentMax))
+ return 0.01
+
+class StatsFiller():
+
+ statsData = {}
+ logData = {}
+
+ of_packetin = 0
+ of_flowmod = 0
+ of_activeflow = 0
+ hostID = 'localhost'
+
+ def __init__(self, numSamples, samplingInterval, batchSize, switchIDs, statsTypes, hostID, cluster, components, seed, logger):
+ self.numSamples = numSamples
+ self.samplingInterval = samplingInterval
+ self.batchSize = batchSize
+ self.switchIDs = switchIDs
+ self.statsTypes = statsTypes
+ self.controllerID = hostID
+ self.cluster = cluster
+ self.components = components
+ self.seed = seed
+ self.logger = logger
+
+ def repr(self):
+ return str(self.statsData)
+
+ def initStatsData(self):
+ if STATS_CPU in self.statsTypes or STATS_MEM in self.statsTypes or STATS_SWAP in self.statsTypes:
+ self.statsData['controller-stats'] = {}
+ self.statsData['controller-stats'][self.hostID] = {}
+
+ if STATS_CPU in self.statsTypes:
+ self.statsData['controller-stats'][self.hostID]['cpu-idle'] = []
+ self.statsData['controller-stats'][self.hostID]['cpu-nice'] = []
+ self.statsData['controller-stats'][self.hostID]['cpu-user'] = []
+ self.statsData['controller-stats'][self.hostID]['cpu-system'] = []
+ if STATS_MEM in self.statsTypes:
+ self.statsData['controller-stats'][self.hostID]['mem-used'] = []
+ self.statsData['controller-stats'][self.hostID]['mem-free'] = []
+ if STATS_SWAP in self.statsTypes:
+ self.statsData['controller-stats'][self.hostID]['swap-used'] = []
+
+ if STATS_OF in self.statsTypes:
+ self.statsData['switch-stats'] = {}
+ for dpid in switchIDs:
+ self.statsData['switch-stats'][dpid] = {}
+
+ if STATS_OF in self.statsTypes:
+ for dpid in switchIDs:
+ self.statsData['switch-stats'][dpid]['OFPacketIn'] = []
+ self.statsData['switch-stats'][dpid]['OFFlowMod'] = []
+ self.statsData['switch-stats'][dpid]['OFActiveFlow'] = []
+
+ if STATS_LOG in self.statsTypes:
+ self.logData[self.hostID] = []
+
+ def generate_a_sw_stat(self, timestamp, dpid, statsTypes, value):
+ sample = {'timestamp':timestamp, 'value':value}
+ self.statsData['switch-stats'][dpid][statsTypes].append(sample)
+
+ def generate_a_controller_stat(self, timestamp, statsTypes, value):
+ sample = {'timestamp':timestamp, 'value':value}
+ self.statsData['controller-stats'][self.hostID][statsTypes].append(sample)
+
+ def generate_log_event(self, timestamp, component, log_level, message):
+ event = {'timestamp':timestamp, 'component':component, 'log-level':log_level,'message':message}
+ self.logData[self.hostID].append(event)
+
+ def generate_a_batch(self, startTime, batchSize):
+ for i in range(batchSize):
+ # Get the sample timestamp in ms
+ ts = int(startTime + i * self.samplingInterval)*1000
+ # controller stats
+ if STATS_CPU in self.statsTypes:
+ max = 100.00
+ v = getRandomPercentage(max)
+ self.generate_a_controller_stat(ts, 'cpu-idle', round(v, 2))
+ max -= v
+ v = getRandomPercentage(max)
+ self.generate_a_controller_stat(ts, 'cpu-nice', round(v, 2))
+ max -= v
+ v = getRandomPercentage(max)
+ self.generate_a_controller_stat(ts, 'cpu-user', round(v, 2))
+ max -= v
+ self.generate_a_controller_stat(ts, 'cpu-system', round(v, 2))
+ if STATS_MEM in self.statsTypes:
+ max = getRandomInt(1000000000)
+ v = getRandomInt(max)
+ self.generate_a_controller_stat(ts, 'mem-used', v)
+ max -= v
+ self.generate_a_controller_stat(ts, 'mem-free', max)
+ if STATS_SWAP in self.statsTypes:
+ max = getRandomInt(1000000000)
+ v = getRandomInt(max)
+ self.generate_a_controller_stat(ts, 'swap-used', v)
+
+ # switch stats
+ if STATS_OF in self.statsTypes:
+ for dpid in self.switchIDs:
+ #print "add stats for %s"%dpid
+ self.of_packetin = getRandomInt(100)
+ self.generate_a_sw_stat(ts, dpid, 'OFPacketIn', self.of_packetin)
+ self.of_flowmod = getRandomInt(100)
+ self.generate_a_sw_stat(ts, dpid, 'OFFlowMod', self.of_flowmod)
+ self.of_activeflow = getRandomInt(100)
+ self.generate_a_sw_stat(ts, dpid, 'OFActiveFlow', self.of_activeflow)
+
+ if STATS_LOG in self.statsTypes:
+ for component in components:
+ self.generate_log_event(ts, component, 'Error', 'Another log message')
+
+ def constructRestRrls(self):
+ """
+ Construct the REST URL for the given host/statsPath, including
+ the items in the query_params dictionary as URL-encoded query parameters
+ """
+ self.statsUrl = 'http://%s:8000/rest/v1/stats/data/%s'%(self.controllerID, self.cluster)
+ self.logUrl = 'http://%s:8000/rest/v1/events/data/%s'%(self.controllerID, self.cluster)
+
+ def printRestErrorInfo(self, e):
+ """
+ Extract the error information and print it.
+ This is mainly intended to demonstrate how to extract the
+ error info from the exception. It may or may not make sense
+ to print out this information, depending on the application.
+ """
+ # Extract the info from the exception
+ error_code = e.getcode()
+ response_text = e.read()
+ try:
+ # Print the error info
+ logger.error('HTTP error: code = %d, %s'%(error_code, response_text))
+
+ obj = json.loads(response_text)
+ error_type = obj.get('error_type')
+ description = obj.get('description')
+
+ # Print the error info
+ logger.error('HTTP error code = %d; error_type = %s; description = %s'%(error_code, str(error_type), description))
+
+ # Print the optional validation error info
+ model_error = obj.get('model_error')
+ if model_error:
+ logger.error('model_error = %s'%str(model_error))
+ field_errors = obj.get('field_errors')
+ if field_errors:
+ logger.error('field_errors = %s'%str(field_errors))
+ except ValueError as e:
+ logger.error(e)
+
+
+ def putRestData(self, url, obj):
+ """
+ Put the given object data to the given type/id/params at the given host.
+ If both the id and query_param_dict are empty, then a new item is created.
+ Otherwise, existing data items are updated with the object data.
+ """
+
+ logger.debug("URL: %s"%url)
+ logger.debug("Sending: %s"%obj)
+ request = urllib2.Request(url, obj, {'Content-Type':'application/json'})
+ request.get_method = lambda: 'PUT'
+ try:
+ response = urllib2.urlopen(request)
+ ret = response.read()
+ logger.debug("Got response: %s"%str(ret))
+ return ret
+ except urllib2.HTTPError as e:
+ logger.error("Got Exception: %s"%str(e))
+ self.printRestErrorInfo(e)
+
+
+ def postData(self):
+ """
+ Put the given object data to the given type/id/params at the given host.
+ """
+
+ self.constructRestRrls()
+
+ if self.statsData:
+ output = json.JSONEncoder().encode(self.statsData)
+ retMsg = self.putRestData(self.statsUrl, output)
+ logger.info("Put rest call for stats data returns: %s"%retMsg)
+ if self.logData:
+ output = json.JSONEncoder().encode(self.logData)
+ retMsg = self.putRestData(self.logUrl, output)
+ logger.info("Put rest call for log data returns: %s"%retMsg)
+
+ def fill(self):
+ endTime = time.time()
+ startTime = endTime - self.numSamples * self.samplingInterval
+ remainingSamples = self.numSamples
+ batchSize = 0
+ while remainingSamples > 0:
+ logger.info("starttime = %s(%d), endtime = %s(%d)"%(time.ctime(startTime),startTime,time.ctime(endTime),endTime))
+ self.initStatsData()
+ if remainingSamples < self.batchSize:
+ batchSize = remainingSamples
+ else:
+ batchSize = self.batchSize
+ remainingSamples -= batchSize
+ self.generate_a_batch(startTime, batchSize)
+ startTime += self.samplingInterval * batchSize
+ self.postData()
+ sys.stdout.write("%0.2f%%\r"%(float(self.numSamples-remainingSamples)*100/self.numSamples))
+
+def parseLogLevel(level):
+ if 'debug'.startswith(level):
+ return logging.DEBUG
+ elif 'info'.startswith(level):
+ return logging.INFO
+ elif 'warning'.startswith(level):
+ return logging.WARNING
+ elif 'error'.startswith(level):
+ return logging.ERROR
+ elif 'critical'.startswith(level):
+ return logging.CRITICAL
+ else:
+ return None
+
+def processOptions(options):
+ """
+ Process the command line arguments
+ """
+
+ global numSamples
+ global samplingInterval
+ global batchSize
+ global statsTypes
+ global numSwitches
+ global switchIDs
+ global seed
+ global controller
+ global cluster
+ global components
+ global debugLevel
+ global logfile
+
+ if options.numSamples:
+ numSamples = options.numSamples
+
+ if options.period:
+ m = re.search("([0-9]*)([A-Za-z]*)$", options.period)
+ (value, unit) = m.groups()
+ if value:
+ value = int(value)
+ if unit:
+ if 'minutes'.startswith(unit):
+ value = 60*value
+ elif 'hours'.startswith(unit):
+ value = 60*60*value
+ elif 'days'.startswith(unit):
+ value = 24*60*60*value
+ elif not 'seconds'.startswith(unit):
+ raise ValueError("Invalid period: %s"%options.period)
+ numSamples = value
+
+ if options.sampleInterval:
+ samplingInterval = options.sampleInterval
+ else:
+ samplingInterval = DEFAULT_SAMPLING_INTERVAL
+
+ numSamples /= samplingInterval
+
+ if options.batchSize:
+ batchSize = options.batchSize
+ else:
+ batchSize = DEFAULT_BATCH
+
+ if options.numSwitches:
+ numSwitches = options.numSwitches
+
+ if options.statsTypes:
+ statsTypes = options.statsTypes.split(',')
+ for stat in statsTypes:
+ if stat not in STATS:
+ raise ValueError("Invalid stat: %s"%stat)
+
+ if options.seed:
+ seed = options.seed
+ else:
+ seed = random.random()
+
+ if options.controller:
+ controller = options.controller
+ else:
+ controller = 'localhost'
+
+ if options.cluster:
+ cluster = options.cluster
+ else:
+ cluster = 'default'
+
+ components = options.components.split(',')
+
+ if options.debugLevel:
+ debugLevel = parseLogLevel(options.debugLevel)
+ else:
+ debugLevel = logging.INFO
+
+ if not debugLevel:
+ raise ValueError("Incorrect debug level, %s."%options.debugLevel)
+
+ if options.logfile:
+ logfile = options.logfile
+ else:
+ logfile = 'filler.log'
+
+
+ if len(statsTypes) == 0:
+ raise ValueError("statsTypes is required.")
+
+ if STATS_OF in statsTypes:
+ if numSwitches == 0:
+ raise ValueError("numSwitches must be nonzero to generate of stats.")
+ else:
+ for i in range(numSwitches):
+ switchIDs.append(intToDpid(i))
+
+ if numSamples == 0:
+ raise ValueError("numSamples or period is required")
+
+
+
+if __name__ == '__main__':
+ parser = OptionParser()
+ group = OptionGroup(parser, "Commonly Used Options")
+ group.add_option("-n", "--numSamples", dest="numSamples", type="long", help="Number of samples to be generated. Can NOT be used with timePeriod option.")
+ group.add_option("-p", "--timePeriod", dest="period", help="The time period to fill the stats data. "
+ "The format can be in seconds, minutes, hours, or days. e.g. 100s(econds), 15m(inutes), 2h(ours), 3d(ays). "
+ "Can NOT be used with numSamples option.")
+ group.add_option("-t", "--samplingInterval", dest="sampleInterval", type = "int", help="The sampling interval in seconds")
+ group.add_option("-b", "--batchSize", dest="batchSize", type = "int", help="The number of samples for each rest put")
+ group.add_option("-s", "--numSwitches", dest="numSwitches", type = "int", help="The number of switches for OF stats. The dpids start with "
+ "00:00:00:00:00:00:00:01 and increment to the number of switches.")
+ group.add_option("-m", "--statsTypes", dest="statsTypes", help="A comma separated statsTypes, Options are cpu, mem, swap, of, and log."
+ " e.g. cpu,mem")
+ group.add_option("-c", "--controller", dest="controller", help="The IP address of the controller")
+ group.add_option("-u", "--cluster", dest="cluster", help="cluster ID")
+ group.add_option("-z", "--components", dest="components", default="sdnplatform,cassandra", help="A comma-separated list of component names for log events")
+ parser.add_option_group(group)
+
+ lc_group = OptionGroup(parser, "Less Commonly Used Options")
+ lc_group.add_option("-r", "--seed", dest="seed", type = "int", help="Same data can be recreated by setting the same seed for the randomizer")
+ lc_group.add_option("-d", "--debugLevel", dest="debugLevel", help="Set the log level for logging: debug, info, warning, critical, error")
+ lc_group.add_option("-f", "--logfile", dest="logfile", help="The logfile that keeps the logs. Default is filler.log")
+ parser.add_option_group(lc_group)
+
+ (options, args) = parser.parse_args()
+ if len(args) != 0:
+ parser.error("incorrect number of arguments: %s"%args)
+
+
+ try:
+ processOptions(options)
+ logger = initLogger()
+ logger.debug("numSample:%d, samplingInterval:%d, batchSize=%d, statsTypes=%s, numSwitches=%d switchIDs=%s seed=%f cluster=%s components=%s"%
+ (numSamples, samplingInterval, batchSize, statsTypes, numSwitches, switchIDs, seed, cluster, components))
+ except ValueError as e:
+ print("Error: %s"%e)
+ sys.exit()
+
+ filler = StatsFiller(numSamples, samplingInterval, batchSize, switchIDs, statsTypes, controller, cluster, components, seed, logger)
+ filler.fill()
diff --git a/cli/sdncon/stats/tests.py b/cli/sdncon/stats/tests.py
new file mode 100755
index 0000000..f2c69e1
--- /dev/null
+++ b/cli/sdncon/stats/tests.py
@@ -0,0 +1,394 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from django.test import TestCase, Client
+import urllib
+from django.utils import simplejson
+from .data import set_use_test_keyspace, flush_stats_db
+import time
+from sdncon.controller.config import get_local_controller_id
+
+def setUpModule():
+ set_use_test_keyspace()
+
+def test_construct_rest_url(path, query_params=None):
+ url = '/rest/%s' % path
+ if query_params:
+ url += '?'
+ url += urllib.urlencode(query_params)
+ return url
+
+def test_get_rest_data(path, query_params=None):
+ url = test_construct_rest_url(path, query_params)
+ c = Client()
+ response = c.get(url)
+ return response
+
+def test_put_rest_data(obj, path, query_params=None):
+ url = test_construct_rest_url(path, query_params)
+ data = simplejson.dumps(obj)
+ c = Client()
+ response = c.put(url , data, 'application/json')
+ return response
+
+def test_delete_rest_data(path, query_params=None):
+ url = test_construct_rest_url(path, query_params)
+ c = Client()
+ response = c.delete(url)
+ return response
+
+BASE_TIME = 1297278987000
+SECONDS_CONVERT = 1000
+MINUTES_CONVERT = 60 * SECONDS_CONVERT
+HOURS_CONVERT = 60 * MINUTES_CONVERT
+DAYS_CONVERT = 24 * HOURS_CONVERT
+
+def make_timestamp(day, hour=0, minute=0, second=0, milliseconds=0):
+ timestamp = BASE_TIME + (day * DAYS_CONVERT) + (hour * HOURS_CONVERT) + \
+ (minute * MINUTES_CONVERT) + (second * SECONDS_CONVERT) + milliseconds
+ return timestamp
+
+class StatsTestCase(TestCase):
+ def tearDown(self):
+ flush_stats_db()
+
+class BasicStatsTest(StatsTestCase):
+
+ STATS_DATA = {
+ 'controller-stats': {
+ '192.168.1.1': {
+ 'cpu-system': [
+ {'timestamp':make_timestamp(1,0),'value':1},
+ {'timestamp':make_timestamp(1,1),'value':2},
+ {'timestamp':make_timestamp(1,2),'value':3},
+ {'timestamp':make_timestamp(1,3),'value':4},
+ {'timestamp':make_timestamp(1,4),'value':5},
+ {'timestamp':make_timestamp(2,1),'value':6},
+ {'timestamp':make_timestamp(2,2),'value':7},
+ {'timestamp':make_timestamp(3,5),'value':8},
+ {'timestamp':make_timestamp(3,8),'value':9},
+ {'timestamp':make_timestamp(4,10),'value':10},
+ {'timestamp':make_timestamp(4,11),'value':11},
+ {'timestamp':make_timestamp(10,11),'value':12},
+ ],
+ 'cpu-idle': [
+ {'timestamp':make_timestamp(1,1),'value':80},
+ {'timestamp':make_timestamp(1,2),'value':83},
+ {'timestamp':make_timestamp(1,3),'value':82},
+ {'timestamp':make_timestamp(1,4),'value':79},
+ {'timestamp':make_timestamp(1,5),'value':85},
+ ]
+ }
+ },
+ 'switch-stats': {
+ '00:01:02:03:04:05': {
+ 'flow-count': [
+ {'timestamp':make_timestamp(1,0),'value':60},
+ {'timestamp':make_timestamp(1,1),'value':88},
+ {'timestamp':make_timestamp(1,2),'value':102},
+ ],
+ 'packet-count': [
+ {'timestamp':make_timestamp(1,0),'value':100},
+ {'timestamp':make_timestamp(1,1),'value':120},
+ {'timestamp':make_timestamp(1,2),'value':160},
+ ],
+ 'packet-count__arp': [
+ {'timestamp':make_timestamp(1,0),'value':20},
+ {'timestamp':make_timestamp(1,3),'value':25},
+ ],
+ 'packet-count__lldp': [
+ {'timestamp':make_timestamp(1,0),'value':30},
+ {'timestamp':make_timestamp(1,4),'value':15},
+ ]
+ }
+ }
+ }
+
+ def check_stats_results(self, returned_results, expected_results, message=None):
+ self.assertEqual(len(returned_results), len(expected_results), message)
+ for i in range(len(returned_results)):
+ expected_timestamp = expected_results[i]['timestamp']
+ returned_timestamp = returned_results[i][0]
+ expected_value = expected_results[i]['value']
+ returned_value = returned_results[i][1]
+ self.assertEqual(expected_timestamp, returned_timestamp, message)
+ self.assertEqual(expected_value, returned_value, message)
+
+ def setUp(self):
+ response = test_put_rest_data(self.STATS_DATA, 'v1/stats/data/local')
+ self.assertEqual(response.status_code, 200)
+
+ def test_get_stats(self):
+ # Get all of the cpu-system stat data
+ response = test_get_rest_data('v1/stats/data/local/controller/192.168.1.1/cpu-system', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(10,11), 'sample-interval':0})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_stats_results(results, self.STATS_DATA['controller-stats']['192.168.1.1']['cpu-system'])
+
+ # Get just one days data of the cpu-system stat
+ response = test_get_rest_data('v1/stats/data/local/controller/192.168.1.1/cpu-system', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(1,4), 'sample-interval':0})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_stats_results(results, self.STATS_DATA['controller-stats']['192.168.1.1']['cpu-system'][:5])
+
+ # Get two day range for cpu-system
+ response = test_get_rest_data('v1/stats/data/local/controller/192.168.1.1/cpu-system', {'start-time':make_timestamp(1,2,10),'end-time':make_timestamp(2,2,20), 'sample-interval':0})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_stats_results(results, self.STATS_DATA['controller-stats']['192.168.1.1']['cpu-system'][3:7])
+
+ # Get all of the flow-count switch stat data
+ response = test_get_rest_data('v1/stats/data/local/switch/00:01:02:03:04:05/flow-count', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(2,0), 'sample-interval':0})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_stats_results(results, self.STATS_DATA['switch-stats']['00:01:02:03:04:05']['flow-count'])
+
+ # Get part of the packet-count switch stat data
+ response = test_get_rest_data('v1/stats/data/local/switch/00:01:02:03:04:05/packet-count', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(1,1), 'sample-interval':0})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_stats_results(results, self.STATS_DATA['switch-stats']['00:01:02:03:04:05']['packet-count'][:2])
+
+ def test_delete_stats(self):
+ # Delete all but the first 2 and last 2 sample points
+ response = test_delete_rest_data('v1/stats/data/local/controller/192.168.1.1/cpu-system', {
+ 'start-time': self.STATS_DATA['controller-stats']['192.168.1.1']['cpu-system'][2]['timestamp'],
+ 'end-time':self.STATS_DATA['controller-stats']['192.168.1.1']['cpu-system'][-3]['timestamp']})
+ self.assertEquals(response.status_code, 200)
+
+ response = test_get_rest_data('v1/stats/data/local/controller/192.168.1.1/cpu-system', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(10,11), 'sample-interval':0})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_stats_results(results, self.STATS_DATA['controller-stats']['192.168.1.1']['cpu-system'][:2] + self.STATS_DATA['controller-stats']['192.168.1.1']['cpu-system'][-2:])
+
+ def test_stats_target_types(self):
+
+ local_controller_id = get_local_controller_id()
+
+ # Check getting the list of all target types
+ response = test_get_rest_data('v1/stats/target/local/')
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.assertEqual(len(results.keys()), 2)
+ self.assertTrue('controller' in results)
+ self.assertTrue('switch' in results)
+
+ # Check getting the info for the controller target type
+ response = test_get_rest_data('v1/stats/target/local/controller')
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.assertEqual(len(results.keys()), 1)
+ self.assertTrue('192.168.1.1' in results)
+ controller_info = results['192.168.1.1']
+ #self.assertEqual(controller_info['controller'], local_controller_id)
+ self.assertEqual(controller_info['last-updated'], make_timestamp(10,11))
+
+ # Check getting the info for the switch target type
+ response = test_get_rest_data('v1/stats/target/local/switch')
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.assertEqual(len(results.keys()), 1)
+ self.assertTrue('00:01:02:03:04:05' in results)
+ switch_info = results['00:01:02:03:04:05']
+ self.assertEqual(switch_info['controller'], local_controller_id)
+ self.assertEqual(switch_info['last-updated'], make_timestamp(1,4))
+
+ def check_stats_type_attributes(self, attributes, expected_last_updated,
+ expected_target_type):
+ last_updated = attributes.get('last-updated')
+ self.assertEqual(last_updated, expected_last_updated)
+ target_type = attributes.get('target-type')
+ self.assertEqual(target_type, expected_target_type)
+
+ def test_stats_type_index(self):
+ response = test_get_rest_data('v1/stats/index/local/controller/192.168.1.1')
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.assertEqual(len(results), 2)
+ attributes = results['cpu-system']
+ self.assertEqual(len(attributes), 1)
+ self.assertEqual(attributes['last-updated'], make_timestamp(10,11))
+ attributes = results['cpu-idle']
+ self.assertEqual(len(attributes), 1)
+ self.assertEqual(attributes['last-updated'], make_timestamp(1,5))
+
+ response = test_get_rest_data('v1/stats/index/local/switch/00:01:02:03:04:05')
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.assertEqual(len(results), 2)
+ attributes = results['flow-count']
+ self.assertEqual(len(attributes), 1)
+ self.assertEqual(attributes['last-updated'], make_timestamp(1,2))
+ attributes = results['packet-count']
+ self.assertEqual(len(attributes), 2)
+ self.assertEqual(attributes['last-updated'], make_timestamp(1,2))
+ parameters = attributes['parameters']
+ self.assertEqual(len(parameters), 2)
+ attributes = parameters['arp']
+ self.assertEqual(len(attributes), 1)
+ self.assertEqual(attributes['last-updated'], make_timestamp(1,3))
+ attributes = parameters['lldp']
+ self.assertEqual(len(attributes), 1)
+ self.assertEqual(attributes['last-updated'], make_timestamp(1,4))
+
+ response = test_get_rest_data('v1/stats/index/local/controller/192.168.1.1/cpu-system')
+ self.assertEqual(response.status_code, 200)
+ attributes = simplejson.loads(response.content)
+ self.assertEqual(len(attributes), 1)
+ self.assertEqual(attributes['last-updated'], make_timestamp(10,11))
+
+class StatsMetadataTest(StatsTestCase):
+
+ def check_metadata(self, stats_type, stats_metadata):
+ # The name in the metadata should match the stats_type
+ name = stats_metadata.get('name')
+ self.assertEqual(stats_type, name)
+
+ # The target_type is a required value, so it should be present in the metadata
+ target_type = stats_metadata.get('target_type')
+ self.assertNotEqual(target_type, None)
+
+ # NOTE: The following assertion works for now, since we only support these
+ # two target types. Eventually we may support other target types (in which
+ # case we'd need to add the new ones to the tuple) or else custom target
+ # types can be added (in which case we'd maybe want to remove this assertion.
+ self.assertTrue(target_type in ('controller','switch'))
+
+ # verbose_name is set automatically by the REST code if it isn't set
+ # explicitly in the metadata, so it should always be present.
+ verbose_name = stats_metadata.get('verbose_name')
+ self.assertNotEqual(verbose_name, None)
+
+ def test_stats_metadata(self):
+ response = test_get_rest_data('v1/stats/metadata/default')
+ self.assertEqual(response.status_code, 200)
+ stats_metadata_dict = simplejson.loads(response.content)
+ self.assertEqual(type(stats_metadata_dict), dict)
+ for stats_type, stats_metadata in stats_metadata_dict.items():
+ # Check that the metadata looks reasonable
+ self.check_metadata(stats_type, stats_metadata)
+
+ # Fetch the metadata for the individual stats type and check that it matches
+ response2 = test_get_rest_data('v1/stats/metadata/default/' + stats_type)
+ self.assertEqual(response2.status_code, 200)
+ stats_metadata2 = simplejson.loads(response2.content)
+ self.assertEqual(stats_metadata, stats_metadata2)
+
+ def test_invalid_stats_type(self):
+ # Try getting a stats type that doesn't exist
+ response = test_get_rest_data('v1/stats/metadata/default/foobar')
+ self.assertEqual(response.status_code, 404)
+ error_result = simplejson.loads(response.content)
+ self.assertEqual(error_result.get('error_type'), 'RestResourceNotFoundException')
+
+class LatestStatTest(StatsTestCase):
+
+ def do_latest_stat(self, target_type, target_id):
+ current_timestamp = int(time.time() * 1000)
+ for i in range(23,-1,-1):
+ # Try with different offsets. Sort of arbitrary list here. Potentially add
+ # new offsets to test specific edge cases
+ offset_list = [0, 3599999, 100, 30000, 400000, 3000000]
+ timestamp = current_timestamp - (i * 3600000) - offset_list[(i+1)%len(offset_list)]
+ stats_data = {target_type + '-stats': {target_id: {'test-stat': [{'timestamp': timestamp, 'value': i}]}}}
+ response = test_put_rest_data(stats_data, 'v1/stats/data/local')
+ self.assertEqual(response.status_code, 200)
+ response = test_get_rest_data('v1/stats/data/local/%s/%s/test-stat' % (target_type, target_id))
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.assertEqual(timestamp, results[0])
+ self.assertEqual(i, results[1])
+
+ def test_controller_latest_stat(self):
+ self.do_latest_stat('controller', '192.168.1.1')
+
+ def test_switch_latest_stat(self):
+ self.do_latest_stat('switch', '00:01:02:03:04:05')
+
+class BasicEventsTest(StatsTestCase):
+
+ EVENTS_DATA = {
+ '192.168.1.1': [
+ {'timestamp': make_timestamp(1,0), 'component': 'sdnplatform', 'log-level': 'Error', 'message': 'Something bad happened'},
+ {'timestamp': make_timestamp(1,1), 'component': 'sdnplatform', 'log-level': 'Info', 'message': 'Something else happened', 'package': 'net.sdnplatformcontroller.core'},
+ {'timestamp': make_timestamp(1,4), 'component': 'sdnplatform', 'log-level': 'Info', 'message': 'Switch connected: 01:02:03:04:45:56', 'package': 'net.sdnplatformcontroller.core', 'dpid': '01:02:03:04:45:56'},
+ {'timestamp': make_timestamp(2,4), 'component': 'django', 'log-level': 'Info', 'message': 'GET: /rest/v1/model/foo'},
+ {'timestamp': make_timestamp(4,10), 'component': 'cassandra', 'log-level': 'Info', 'message': 'Compaction occurred'},
+ {'timestamp': make_timestamp(4,11), 'component': 'cassandra', 'log-level': 'Info', 'message': 'One more compaction occurred'},
+ {'timestamp': make_timestamp(7,10), 'component': 'cassandra', 'log-level': 'Info', 'message': 'Another compaction occurred'},
+ ]
+ }
+
+ TAGGED_EVENTS_DATA = {
+ '192.168.1.1': [
+ {'timestamp': make_timestamp(10,0), 'pk-tag':'1234', 'component': 'sdnplatform', 'log-level': 'Error', 'message': 'Something bad happened'},
+ {'timestamp': make_timestamp(10,1), 'pk-tag':'5678', 'component': 'sdnplatform', 'log-level': 'Info', 'message': 'Something else happened', 'package': 'net.sdnplatformcontroller.core'},
+ ]
+ }
+
+
+ def check_events_results(self, returned_results, expected_results, message=None):
+ self.assertEqual(expected_results, returned_results, message)
+ #self.assertEqual(len(returned_results), len(expected_results), message)
+ #for i in range(len(returned_results)):
+ # self.assertEqual(returned_results[i], expected_results[i])
+
+ def test_events(self):
+ response = test_put_rest_data(self.EVENTS_DATA, 'v1/events/data/default')
+ self.assertEqual(response.status_code, 200)
+
+ # Get all of the data
+ response = test_get_rest_data('v1/events/data/default/192.168.1.1', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(7,10)})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_events_results(results, self.EVENTS_DATA['192.168.1.1'])
+
+ # Get just one days data
+ response = test_get_rest_data('v1/events/data/default/192.168.1.1', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(1,4)})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_events_results(results, self.EVENTS_DATA['192.168.1.1'][:3])
+
+ # Get two day range
+ response = test_get_rest_data('v1/events/data/default/192.168.1.1', {'start-time':make_timestamp(1,2),'end-time':make_timestamp(4,11)})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_events_results(results, self.EVENTS_DATA['192.168.1.1'][2:6])
+
+ def test_tagged_events(self):
+ response = test_put_rest_data(self.TAGGED_EVENTS_DATA, 'v1/events/data/default')
+ self.assertEqual(response.status_code, 200)
+
+ response = test_get_rest_data('v1/events/data/default/192.168.1.1', {'start-time':make_timestamp(10,0),'end-time':make_timestamp(10,2),'include-pk-tag':'true'})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_events_results(results, self.TAGGED_EVENTS_DATA['192.168.1.1'])
+
+ def test_delete_events(self):
+ response = test_put_rest_data(self.EVENTS_DATA, 'v1/events/data/default')
+ self.assertEqual(response.status_code, 200)
+
+ # Delete all but the first 2 and last 2 events
+ response = test_delete_rest_data('v1/events/data/default/192.168.1.1', {
+ 'start-time': self.EVENTS_DATA['192.168.1.1'][2]['timestamp'],
+ 'end-time':self.EVENTS_DATA['192.168.1.1'][-3]['timestamp']})
+ self.assertEquals(response.status_code, 200)
+
+ response = test_get_rest_data('v1/events/data/default/192.168.1.1', {'start-time':make_timestamp(1,0),'end-time':make_timestamp(7,10)})
+ self.assertEqual(response.status_code, 200)
+ results = simplejson.loads(response.content)
+ self.check_events_results(results, self.EVENTS_DATA['192.168.1.1'][:2] + self.EVENTS_DATA['192.168.1.1'][-2:])
+
diff --git a/cli/sdncon/stats/utils.py b/cli/sdncon/stats/utils.py
new file mode 100755
index 0000000..d227a58
--- /dev/null
+++ b/cli/sdncon/stats/utils.py
@@ -0,0 +1,85 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from thrift.transport import TTransport
+from thrift.transport import TSocket
+from thrift.protocol import TBinaryProtocol
+from cassandra import Cassandra
+from cassandra.ttypes import *
+
+# FIXME: This class is derived from the django_cassandra backend.
+# Should refactor this code better.
+
+class CassandraConnection(object):
+ def __init__(self, host, port, keyspace, user, password):
+ self.host = host
+ self.port = port
+ self.keyspace = keyspace
+ self.user = user
+ self.password = password
+ self.transport = None
+ self.client = None
+ self.keyspace_set = False
+ self.logged_in = False
+
+ def set_keyspace(self):
+ if not self.keyspace_set:
+ self.client.set_keyspace(self.keyspace)
+ self.keyspace_set = True
+
+ def login(self):
+ # TODO: This user/password auth code hasn't been tested
+ if not self.logged_in:
+ if self.user:
+ credentials = {'username': self.user, 'password': self.password}
+ self.client.login(AuthenticationRequest(credentials))
+ self.logged_in = True
+
+ def connect(self, set_keyspace=False, login=False):
+ if self.transport == None:
+ # Create the client connection to the Cassandra daemon
+ socket = TSocket.TSocket(self.host, int(self.port))
+ transport = TTransport.TFramedTransport(TTransport.TBufferedTransport(socket))
+ protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
+ transport.open()
+ self.transport = transport
+ self.client = Cassandra.Client(protocol)
+
+ if login:
+ self.login()
+
+ if set_keyspace:
+ self.set_keyspace()
+
+ def disconnect(self):
+ if self.transport != None:
+ self.transport.close()
+ self.transport = None
+ self.client = None
+ self.keyspace_set = False
+ self.logged_in = False
+
+ def is_connected(self):
+ return self.transport != None
+
+ def reconnect(self):
+ self.disconnect()
+ self.connect(True, True)
+
+ def get_client(self):
+ if self.client == None:
+ self.connect(True,True)
+ return self.client
diff --git a/cli/sdncon/stats/views.py b/cli/sdncon/stats/views.py
new file mode 100755
index 0000000..5172cc1
--- /dev/null
+++ b/cli/sdncon/stats/views.py
@@ -0,0 +1,364 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from cassandra.ttypes import *
+from django.conf import settings
+from django.http import HttpResponse
+from django.utils import simplejson
+from functools import wraps
+import time
+from .data import StatsException, StatsInvalidStatsDataException, \
+ StatsInvalidStatsTypeException, \
+ get_stats_db_connection, init_stats_db_connection, \
+ get_stats_metadata, get_stats_type_index, \
+ get_stats_target_types, get_stats_targets, delete_stats_data, \
+ get_stats_data, get_latest_stat_data, put_stats_data, \
+ get_closest_sample_interval, get_closest_window_interval, \
+ get_log_event_data, put_log_event_data, delete_log_event_data, \
+ VALUE_DATA_FORMAT
+
+from sdncon.rest.views import RestException, \
+ RestInvalidPutDataException, RestMissingRequiredQueryParamException,\
+ RestInvalidMethodException, RestDatabaseConnectionException,\
+ RestInternalException, RestResourceNotFoundException, \
+ safe_rest_view, JSON_CONTENT_TYPE, get_successful_response
+from sdncon.controller.config import get_local_controller_id
+
+
+class RestStatsException(RestException):
+ def __init__(self, stats_exception):
+ super(RestStatsException,self).__init__('Error accessing stats: ' + str(stats_exception))
+
+class RestStatsInvalidTimeDurationUnitsException(RestException):
+ def __init__(self, units):
+ super(RestStatsInvalidTimeDurationUnitsException,self).__init__('Invalid time duration units: ' + str(units))
+
+
+class RestStatsInvalidTimeRangeException(RestException):
+ def __init__(self):
+ super(RestStatsInvalidTimeRangeException,self).__init__('Invalid time range specified in stats REST API. '
+ '2 out of 3 of start-time, end-time, and duration params must be specified.')
+
+
+@safe_rest_view
+def safe_stats_rest_view(func, *args, **kwargs):
+ try:
+ request = args[0]
+ response = func(*args, **kwargs)
+ except RestException:
+ raise
+ except StatsInvalidStatsDataException:
+ raise RestInvalidPutDataException()
+ except StatsInvalidStatsTypeException:
+ raise RestResourceNotFoundException(request.path)
+ except StatsException, e:
+ raise RestStatsException(e)
+ except Exception, e:
+ raise RestInternalException(e)
+ return response
+
+
+def safe_stats_view(func):
+ """
+ This is a decorator that takes care of exception handling for the
+ stats views so that stats exceptions are converted to the appropriate
+ REST exception.
+ """
+ @wraps(func)
+ def _func(*args, **kwargs):
+ response = safe_stats_rest_view(func, *args, **kwargs)
+ return response
+
+ return _func
+
+
+def init_db_connection():
+ db_connection = get_stats_db_connection()
+ if not db_connection:
+ try:
+ stats_db_settings = settings.STATS_DATABASE
+ except Exception:
+ stats_db_settings = {}
+
+ host = stats_db_settings.get('HOST', 'localhost')
+ port = stats_db_settings.get('PORT', 9160)
+ keyspace = stats_db_settings.get('NAME', 'sdnstats')
+ user = stats_db_settings.get('USER')
+ password = stats_db_settings.get('PASSWORD')
+ replication_factor = stats_db_settings.get('CASSANDRA_REPLICATION_FACTOR', 1)
+ column_family_def_default_settings = stats_db_settings.get('CASSANDRA_COLUMN_FAMILY_DEF_DEFAULT_SETTINGS', {})
+
+ init_stats_db_connection(host, port, keyspace, user, password, replication_factor, column_family_def_default_settings)
+
+ db_connection = get_stats_db_connection()
+ assert(db_connection is not None)
+
+START_TIME_QUERY_PARAM = 'start-time'
+END_TIME_QUERY_PARAM = 'end-time'
+DURATION_QUERY_PARAM = 'duration'
+SAMPLE_INTERVAL_QUERY_PARAM = 'sample-interval'
+SAMPLE_COUNT_QUERY_PARAM = 'sample-count'
+SAMPLE_WINDOW_QUERY_PARAM = 'sample-window'
+DATA_FORMAT_QUERY_PARAM = 'data-format'
+LIMIT_QUERY_PARAM = 'limit'
+INCLUDE_PK_TAG_QUERY_PARAM = 'include-pk-tag'
+
+DEFAULT_SAMPLE_COUNT = 50
+
+def convert_time_point(time_point):
+
+ if time_point is None:
+ return None
+
+ if time_point:
+ time_point = time_point.lower()
+ if time_point in ('now', 'current'):
+ time_point = int(time.time() * 1000)
+ else:
+ time_point = int(time_point)
+
+ return time_point
+
+
+UNIT_CONVERSIONS = (
+ (('h', 'hour', 'hours'), 3600000),
+ (('d', 'day', 'days'), 86400000),
+ (('w', 'week', 'weeks'), 604800000),
+ (('m', 'min', 'mins', 'minute', 'minutes'), 60000),
+ (('s', 'sec', 'secs', 'second', 'seconds'), 1000),
+ (('ms', 'millisecond', 'milliseconds'), 1)
+)
+
+def convert_time_duration(duration):
+
+ if duration is None:
+ return None
+
+ value = ""
+ for c in duration:
+ if not c.isdigit():
+ break
+ value += c
+
+ units = duration[len(value):].lower()
+ value = int(value)
+
+ if units:
+ converted_value = None
+ for conversion in UNIT_CONVERSIONS:
+ if units in conversion[0]:
+ converted_value = value * conversion[1]
+ break
+ if converted_value is None:
+ raise RestStatsInvalidTimeDurationUnitsException(units)
+
+ value = converted_value
+
+ return value
+
+
+def get_time_range(start_time, end_time, duration):
+
+ if not start_time and not end_time and not duration:
+ return (None, None)
+
+ start_time = convert_time_point(start_time)
+ end_time = convert_time_point(end_time)
+ duration = convert_time_duration(duration)
+
+ if start_time:
+ if not end_time and duration:
+ end_time = start_time + duration
+ elif end_time and duration:
+ start_time = end_time - duration
+
+ if not start_time or not end_time:
+ raise RestStatsInvalidTimeRangeException()
+
+ return (start_time, end_time)
+
+
+def get_time_range_from_request(request):
+ start_time = request.GET.get(START_TIME_QUERY_PARAM)
+ end_time = request.GET.get(END_TIME_QUERY_PARAM)
+ duration = request.GET.get(DURATION_QUERY_PARAM)
+
+ return get_time_range(start_time, end_time, duration)
+
+#def get_stats_time_range(request):
+# start_time = request.GET.get(START_TIME_QUERY_PARAM)
+# end_time = request.GET.get(END_TIME_QUERY_PARAM)
+#
+# if not start_time and not end_time:
+# return None
+#
+# if not start_time:
+# raise RestMissingRequiredQueryParamException(START_TIME_QUERY_PARAM)
+# if not end_time:
+# raise RestMissingRequiredQueryParamException(END_TIME_QUERY_PARAM)
+#
+# return (start_time, end_time)
+
+
+@safe_stats_view
+def do_get_stats(request, cluster, target_type, target_id, stats_type):
+
+ # FIXME: Hack to handle the old hard-coded controller id value
+ if target_type == 'controller' and target_id == 'localhost':
+ target_id = get_local_controller_id()
+
+ # Get the time range over which we're getting the stats
+ start_time, end_time = get_time_range_from_request(request)
+
+ init_db_connection()
+
+ if request.method == 'GET':
+ window = request.GET.get(SAMPLE_WINDOW_QUERY_PARAM, 0)
+ if window:
+ window = convert_time_duration(window)
+ if window != 0:
+ window = get_closest_window_interval(int(window))
+ # FIXME: Error checking on window value
+
+ data_format = request.GET.get(DATA_FORMAT_QUERY_PARAM, VALUE_DATA_FORMAT)
+ # FIXME: Error checking on data_format value
+
+ limit = request.GET.get(LIMIT_QUERY_PARAM)
+ if limit:
+ limit = int(limit)
+ # FIXME: Error checking on limit value
+
+ if start_time is not None and end_time is not None:
+ # FIXME: Error checking on start_time and end_time values
+ sample_interval = request.GET.get(SAMPLE_INTERVAL_QUERY_PARAM)
+ if not sample_interval:
+ # FIXME: Error checking on sample_period value
+ sample_count = request.GET.get(SAMPLE_COUNT_QUERY_PARAM, DEFAULT_SAMPLE_COUNT)
+ # FIXME: Error checking on sample_count value
+
+ sample_interval = (end_time - start_time) / int(sample_count)
+ else:
+ sample_interval = convert_time_duration(sample_interval)
+
+ if sample_interval != 0:
+ sample_interval = get_closest_sample_interval(sample_interval)
+
+ stats_data = get_stats_data(cluster, target_type, target_id,
+ stats_type, start_time, end_time, sample_interval, window, data_format, limit)
+ else:
+ stats_data = get_latest_stat_data(cluster, target_type, target_id, stats_type, window, data_format)
+
+ response_data = simplejson.dumps(stats_data)
+ response = HttpResponse(response_data, JSON_CONTENT_TYPE)
+
+ elif request.method == 'DELETE':
+ delete_stats_data(cluster, target_type, target_id, stats_type,
+ start_time, end_time)
+ response = get_successful_response()
+ else:
+ raise RestInvalidMethodException()
+
+ return response
+
+
+@safe_stats_view
+def do_get_stats_metadata(request, cluster, stats_type=None):
+ metadata = get_stats_metadata(cluster, stats_type)
+ response_data = simplejson.dumps(metadata)
+ return HttpResponse(response_data, JSON_CONTENT_TYPE)
+
+
+@safe_stats_view
+def do_get_stats_type_index(request, cluster, target_type, target_id, stats_type=None):
+ # FIXME: Hack to handle the old hard-coded controller id value
+ if target_type == 'controller' and target_id == 'localhost':
+ target_id = get_local_controller_id()
+ init_db_connection()
+ index_data = get_stats_type_index(cluster, target_type, target_id, stats_type)
+ response_data = simplejson.dumps(index_data)
+ return HttpResponse(response_data, JSON_CONTENT_TYPE)
+
+
+@safe_stats_view
+def do_get_stats_target_types(request, cluster):
+ init_db_connection()
+ target_type_data = get_stats_target_types(cluster)
+ response_data = simplejson.dumps(target_type_data)
+ return HttpResponse(response_data, JSON_CONTENT_TYPE)
+
+
+@safe_stats_view
+def do_get_stats_targets(request, cluster, target_type=None):
+ init_db_connection()
+ target_data = get_stats_targets(cluster, target_type)
+ response_data = simplejson.dumps(target_data)
+ return HttpResponse(response_data, JSON_CONTENT_TYPE)
+
+
+@safe_stats_view
+def do_put_stats(request, cluster):
+ if request.method != 'PUT':
+ raise RestInvalidMethodException()
+
+ init_db_connection()
+
+ stats_data = simplejson.loads(request.raw_post_data)
+ put_stats_data(cluster, stats_data)
+
+ response = get_successful_response()
+
+ return response
+
+
+@safe_stats_view
+def do_get_events(request, cluster, node_id):
+ # FIXME: Hack to handle the old hard-coded controller id value
+ if node_id == 'localhost':
+ node_id = get_local_controller_id()
+
+ # Get the time range over which we're getting the events
+ start_time, end_time = get_time_range_from_request(request)
+
+ init_db_connection()
+
+ if request.method == 'GET':
+ include_pk_tag_param = request.GET.get(INCLUDE_PK_TAG_QUERY_PARAM, 'false')
+ include_pk_tag = include_pk_tag_param.lower() == 'true'
+ events_list = get_log_event_data(cluster, node_id, start_time, end_time, include_pk_tag)
+ response_data = simplejson.dumps(events_list)
+ response = HttpResponse(response_data, JSON_CONTENT_TYPE)
+ elif request.method == 'DELETE':
+ delete_log_event_data(cluster, node_id, start_time, end_time)
+ response = get_successful_response()
+ else:
+ raise RestInvalidMethodException()
+
+ return response
+
+@safe_stats_view
+def do_put_events(request, cluster):
+ if request.method != 'PUT':
+ raise RestInvalidMethodException()
+
+ init_db_connection()
+
+ events_data = simplejson.loads(request.raw_post_data)
+ put_log_event_data(cluster, events_data)
+
+ response = get_successful_response()
+
+ return response
+