blob: f5854539ebee3f57771eaf7c4a98f9336fbf6d4f [file] [log] [blame]
#
# Copyright (c) 2011,2012,2013 Big Switch Networks, Inc.
#
# Licensed under the Eclipse Public License, Version 1.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.eclipse.org/legal/epl-v10.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
#
import re
import numbers
import collections
import traceback
import types
import json
import time
import sys
import datetime
import os
import c_data_handlers
import c_validations
import c_completions
import c_actions
import utif
import error
import doc
# TODO list
#
# - support for 'min' and 'max' as boundary values in range/length restrictions
# - distinguish between completion help string and syntax/doc string
# - allow multiple validation functions for typedefs/args
# - verify that it handles unambiguous prefixes in subcommands and argument tags
# - support for getting next available sequence number (e.g. ip name-server)
# - test code to handle REST errors
# - figure out how to deal with arguments that are optional for 'no' commands
# - tests for length/range validations, both for strings and integers
# - add validation patterns for domain-name and ip-address-or-domain-name typedefs
# - get syntax help for "no" commands
# - test using unambiguous prefix for command/subcommand names
# - test using unambiguous prefix for argument tags
# - test case insensitivity for command/subcommand/tag names (i.e. any fixed token)
# - Allow/ignore missing command arguments for "no" commands (e.g. "no ip address")
# - support for case-sensitive enum types (do we need this?)
# - clean up 'description' vs. 'descriptor' terminology (i.e. pick one)
# - return "<cr>" as completion if there are no args to complete
# - be consistent about using _xyz function naming convention for private functions
# - better handling of exceptions in do_command_completions. catch CommandErrors,
# distinguish between expected error (e.g. ValidationError) and errors that
# indicate bug (e.g. base Exception raised)
# - sort issues with sequence numbers. They are treated as strings by the Django
# Cassandra backend instead of integers. This means that, for example, the sequence
# number 1,2,10 are incorrectly sorted in the order 1, 10, 2. Need to fix this in
# the Django backend.
# - Shouldn't allow "no" variant of a command if there's no default value for an argument.
# For example, shouldn't allow "no clock set 01:30:00".
# - treat scope stack as a stack with append & pop operations and search in reverse order
# - rename the "data" item in the scopes to be "args" (or maybe not?)
# - clean up syntax help message; includes "Syntax: " twice in message.
# - clean up syntax help messages to not include extraneous white space characters
# For example "test [foo <foo-value>]" instead of "test [ foo <foo-value> ]"
# Or maybe not?
# - support for explicit command abbreviations (e.g. "s" = "show" even if there are
# other available commands that begin with "s".
# - automated generation of running config
# - rename 'config' command-type to 'config-fields'
# - need to fix getting completions with alternation if any of the alternates don't
# match the partial completion text (e.g. "firewall allow op<tab>").
# - allow command description that don't have any args to omit the 'args' setting
# (workaround for now is to just have an empty args tuple/list
# - support "repeatable" attribute for args to support repetition
# - Support for single token arguments with a custom action and/or arg handler
# - allow multiple "name"s for a command description and allow each name to optionally
# set a field in the arg data. Syntax would probably be that the "name" field could
# be a list of names. So the name could either be a simple string, a dict (which
# would support pattern-based names and setting a field with the command word, or a
# list of names (where each name could be either the simple string or the dict)
# - maybe get rid of specifying mode in the command description. Instead a command
# description would be enabled for a mode by adding it to the list of commands that
# are enabled for that mode. This would happen in some data (or code) that's separate
# from the command description. I think this would be better so that we can prune
# the number of command description we need to parse for a given command. Otherwise
# we always need to check against all command descriptions.
# - distinguish between exact match vs. prefix match in a choices block. If there's
# only a single exact match then that should get precedence over other matches
# and not be considered an ambiguous command
# - add 'default' attribute for the default value of an optional field if it's not
# specified. Should this be treated as mutually exclusive with 'optional'?
# - convert reset-fields action to use fields instead of arg_data. If we're using
# new-style hack then we probably need to have it be new-reset-fields or something
# like that.
# - clean up usage of return values for the command handling methods, i.e.
# handle_one_command and all of the other handle_* methods. Should maybe
# just not return anything for any of those and just capture the
# matches/completion in the data member.
# - fix behavior for printing <cr> as a completion at the end of a
# complete command. Right now there's a bug where it will still print out
# the syntax help.
# - support string token argument as just a simple string in the argument list
# - custom completion proc for cidr address. Currently if a command description
# supports either cidr or ip+netmask it will handle either for command execution
# but completion will only handle the ip+netmask variant. With a custom
# completion proc you could combine the ip+netmask from the db and format as
# a cidr address and return that as a completion
command_type_defaults = {}
# We rely on the higher level CLI object for certain operations, so we're initialized
# with a reference that's used to call back to it. This is initialized in init_command.
sdnsh = None
# command_registry is the list of command descriptions that have been registered
# via add_command. The reason this is a list (i.e. instead of a dictionary that's
# keyed by the command name) is that's it's possible that you could have different
# command descriptions with the same name but enabled for different modes.
command_registry = []
# command_submode_tree is a dictionary, where each submode, and its children
# are saved.
#
command_submode_tree = { 'login' : { 'enable' : { 'config' : {} } } }
# validation_registry is the dictionary of validation functions that have been
# registered via add_validation. The key is the name of the function as specified
# in typedef and command description. The value is a 2 item tuple where the first
# item is the validation function to invoke and the second item is a dictionary
# describing the arguments that are used when invoking the validation function.
# See the comments for _call_proc for more information about the format of the
# data describing the args.
# invoke.
validation_registry = {}
# typedef_registry is the dictionary of typedef descriptions. The key is the
# name of the typedef and the value is the typedef description.
# value if the function to invoke
typedef_registry = {}
# action_registry is the dictionary of action functions that have been registered
# via add_action. The key is the name of the function as specified in
# command descriptions. The value is a 2 item tuple where the first
# item is the action function to invoke and the second item is a dictionary
# describing the arguments that are used when invoking the validation function.
# See the comments for _call_proc for more information about the format of the
# data describing the args.
action_registry = {}
# completion_registry is the dictionary of completion functions that have been
# registered via add_completion. The key is the name of the function as specified
# in command descriptions. The value is a 2 item tuple where the first
# item is the completion function to invoke and the second item is a dictionary
# describing the arguments that are used when invoking the validation function.
# See the comments for _call_proc for more information about the format of the
# data describing the args.
completion_registry = {}
# argument_data_handler_registry is the dictionary of custom argument data
# handlers that have been registered via add_argument_data_handler. These are
# used if custom processing is required to convert the argument value to the
# data that is added to the argument data dictionary. The key is the name of
# the function as specified in command descriptions. The value is a 2 item
# tuple where the first item is the handler function to invoke and the second
# item is a dictionary describing the arguments that are used when invoking
# the validation function. See the comments for _call_proc for more information
# about the format of the data describing the args.
argument_data_handler_registry = {}
#
# keep a list of all the command module names
command_added_modules = {}
#
command_syntax_version = {}
def add_command(command):
command_registry.append(command)
def model_obj_type_disable_submode(obj_type):
mi.obj_type_source_set_debug_only(obj_type)
def model_obj_type_enable_cascade_delete(obj_type):
mi.cascade_delete_set_enable(obj_type)
def model_obj_type_weak_with_cascade_delete(obj_type):
mi.cascade_delete_enable_force(obj_type)
def model_obj_type_set_title(obj_type, title):
mi.obj_type_set_title(obj_type, title)
def model_obj_type_disable_edit(obj_type, field):
mi.obj_type_disable_edit(obj_type, field)
def model_obj_type_set_case(obj_type, field, case):
mi.set_obj_type_field_case_sensitive(obj_type, field, case)
def model_obj_type_set_show_this(obj_type, show_this_list):
"""
When an obj-type gets displayed via 'show this', complex
objects may require several different items to be displayed.
The second parameter is a list, where each item has several
fields: the first is the name of the object to display, and
the second is the format to use for that display
"""
mi.obj_type_set_show_this(obj_type, show_this_list)
def command_dict_minus_lec():
# lec -- 'login', 'enable', 'config'.
return [x for x in sdnsh.command_dict.keys()
if x not in ['login', 'enable', 'config', 'config-']]
def command_dict_submode_index(submode):
if submode in ['login', 'enable', 'config', 'config-']:
return submode
if submode.startswith('config-'):
return submode
if sdnsh.description:
print _line(), "unknown submode ", submode
def add_command_mode_recurse(mode_dict, mode, submode):
if mode in mode_dict:
if not submode in mode_dict[mode]:
mode_dict[mode][submode] = {}
else:
for d in mode_dict.keys():
add_command_mode_recurse(mode_dict[d], mode, submode)
def add_command_submode(mode, submode):
add_command_mode_recurse(command_submode_tree, mode, submode)
def show_command_submode_tree_recurse(mode_dict, level):
if len(mode_dict):
for d in mode_dict.keys():
index = command_dict_submode_index(d)
items = [x if _is_string(x) else x.pattern for x in sdnsh.command_dict.get(index, '')]
print " " * level, d, ': ' + ', '.join(items)
show_command_submode_tree_recurse(mode_dict[d], level + 2)
def show_command_submode_tree():
show_command_submode_tree_recurse(command_submode_tree, 0)
def command_dict_append(command, mode, name):
if not mode in sdnsh.command_dict:
sdnsh.command_dict[mode] = []
if type(name) == dict:
# this is a regular expression name, add the re.
if not 'pattern' in name:
print '%s: missing pattern' % command['self']
name['re'] = re.compile(name['pattern'])
sdnsh.command_dict[mode].append(name)
def command_nested_dict_append(command, mode, name):
if not mode in sdnsh.command_nested_dict:
sdnsh.command_nested_dict[mode] = []
if type(name) == dict:
# this is a regular expression name, add the re.
if not 'pattern' in name:
print '%s: missing pattern' % command['self']
name['re'] = re.compile(name['pattern'])
if not name in sdnsh.command_nested_dict[mode]:
sdnsh.command_nested_dict[mode].append(name)
def add_command_to_command_dict(command, mode, name, var):
#
if mode in ['login', 'enable']:
command_nested_dict_append(command, mode, name)
elif mode == 'config' or mode == 'config-':
command_dict_append(command, mode, name)
elif mode.endswith('*'):
index = mode[:-1]
command_nested_dict_append(command, index, name)
elif mode.startswith("config"):
command_dict_append(command, mode, name)
def add_command_using_dict(command_description, value):
"""
"""
if not 'self' in value:
value['self'] = command_description
if not 'name' in value:
print '%s: missing \'name\': in description' % command_description
return
if not 'mode' in value:
print '%s: missing \'mode\': in description' % command_description
return
name = value['name']
mode = value['mode']
command_type = value.get('command-type')
if command_type == 'config-submode':
submode = value.get('submode-name')
# add_command_submode(mode,submode)
# XXX this needs improving
feature = value.get('feature')
if feature and name not in ['show']:
if not name in sdnsh.command_name_feature:
sdnsh.command_name_feature[name] = []
if not feature in sdnsh.command_name_feature[name]:
sdnsh.command_name_feature[name].append(feature)
#
# populate sdnsh.controller_dict based on the modes described for this command
#
if 'mode' in value:
if _is_list(mode):
for m in mode:
add_command_to_command_dict(value, m, name, command_description)
else:
add_command_to_command_dict(value, mode, name, command_description)
add_command(value)
def add_commands_from_class(aclass):
"""
"""
for (var, value) in vars(aclass).items():
if re.match(r'.*COMMAND_DESCRIPTION$', var):
add_command_using_dict(var, value)
def add_command_module_name(version, module):
"""
Save some state about the version/module
"""
if version not in command_syntax_version:
command_syntax_version[version] = [module]
else:
command_syntax_version[version].append(module)
def add_commands_from_module(version, module, dump_syntax = False):
add_command_module_name(version, module)
for name in dir(module):
if re.match(r'.*COMMAND_DESCRIPTION$', name):
if module.__name__ not in command_added_modules:
command_added_modules[module.__name__] = [name]
if name not in command_added_modules[module.__name__]:
command_added_modules[module.__name__].append(name)
command = getattr(module, name)
add_command_using_dict(name, command)
if dump_syntax:
handler = CommandSyntaxer(name)
handler.add_known_command(command)
print handler.handle_command_results()
def add_validation(name, validation_proc, default_args={'kwargs':
{'typedef': '$typedef', 'value': '$value'}}):
validation_registry[name] = (validation_proc, default_args)
def add_typedef(typedef):
typedef_registry[typedef['name']] = typedef
def add_action(name, action_proc, default_args={}):
action_registry[name] = (action_proc, default_args)
def action_invoke(name, action_args):
# no validation to match args.
action = action_registry[name]
action[0](action_args)
def add_completion(name, completion_proc, default_args= {'kwargs':
{'words': '$words', 'text': '$text', 'completions': '$completions'}}):
completion_registry[name] = (completion_proc, default_args)
def add_argument_data_handler(name, handler_proc, default_args =
{'kwargs': {'name': '$name', 'value': '$value',
'data': '$data', 'scopes': '$scopes'}}):
argument_data_handler_registry[name] = (handler_proc, default_args)
def add_command_type(name, command_defaults):
command_type_defaults[name] = command_defaults
def _line():
# pylint: disable=W0212
f = sys._getframe().f_back
return '[%s:%d]' % (f.f_code.co_filename, f.f_lineno)
#
# VALIDATION PROCS
#
def _is_range_boundary(boundary):
"""
Returns whether or not the given value is a valid boundary value.
Valid values are integers or the special strings 'min' or 'max'
(case-insensitive)
"""
return (isinstance(boundary, numbers.Integral) or
(_is_string(boundary) and (boundary.lower() in ('min','max'))))
def _convert_range_boundary(boundary, test_value):
"""
Converts the given boundary value to an appropriate integral
boundary value. This is used to handle the special "min' and
'max' string values. The test_value argument should be the value
which is being tested for whether or not it's in a range. Since
long integral values have an unlimited precision there is no
actual MIN or MAX value, so we just make the value one less or
one more than the test_value, so that the range check in
validate_integer against the test_value will fail or succeed
appropriately. Note that this means that this function must be
called again for each different test_value (i.e. you can't
pre-process the ranges to remove the 'min' and 'max' values).
"""
if _is_string(boundary):
if boundary.lower() == 'min':
boundary = test_value - 1
elif boundary.lower() == 'max':
boundary = test_value + 1
else:
raise error.CommandDescriptionError('Invalid range boundary constant; must be "min", "max" or integer value')
return boundary
def _is_single_range(r):
"""
Returns whether or not the argument is a valid single range.
A valid range is either a single integral value or else a
sequence (tuple or list) with 2 elements, each of which is a valid
range boundary value.
"""
return (isinstance(r, numbers.Integral) or
(isinstance(r, collections.Sequence) and (len(r) == 2) and
_is_range_boundary(r[0]) and _is_range_boundary(r[1])))
def _check_one_range(r):
"""
Checks that the argument is a valid single range as determined
by _is_single_range.
If it is, then the function returns None.
It it's not, then a RangeSyntaxError exception is raised.
"""
if not _is_single_range(r):
raise error.RangeSyntaxError(str(r))
def _check_range(r):
"""
Checks that the argument is a valid range.
If it is, then the function returns None.
It it's not, then a RangeSyntaxError exception is raised.
FIXME: This doesn't seem to be used anymore. Remove?
"""
if _is_single_range(r):
_check_one_range(r)
elif isinstance(r, collections.Sequence):
for r2 in r:
_check_one_range(r2)
else:
raise error.RangeSyntaxError(str(r))
def _lookup_typedef_value(typedef, name):
"""
Look up the given name in a typedef.
If it's not found in the given typedef it recursively searches
for the value in the base typedefs.
Returns None if the value is not found.
FIXME: Note that this means there's currently no way to distinguish
between an explicit None value and the name not being specified.
"""
assert typedef is not None
assert name is not None
# Check if the typedef has the attribute
value = typedef.get(name)
if value:
return value
# Otherwise, see if it's defined in the base type(s)
base_type_name = typedef.get('base-type')
if base_type_name:
base_typedef = typedef_registry.get(base_type_name)
if not base_typedef:
raise error.CommandDescriptionError('Unknown type name: %s' % base_type_name)
return _lookup_typedef_value(base_typedef, name)
return None
def _raise_argument_validation_exception(typedef, value, detail, expected_tokens=None):
"""
Called when an argument doesn't match the expected type. Raises an
ArgumentValidationError exception. The message for the exception
can be customized by specifying a error format string in the
type definition. The format string is called ' validation-error-format'.
The format string can use the following substitution variables:
- 'typedef': The name of the typedef
- 'value': The value that didn't match the typedef restrictions
- 'detail': Additional message for the nature of the type mismatch
The default format string is: 'Invalid %(typedef)s: %(value)s; %(detail)s'.
"""
typedef_name = typedef.get('help-name')
if typedef_name is None:
typedef_name = typedef.get('name')
if typedef_name is None:
typedef_name = typedef.get('field')
if typedef_name is None:
typedef_name = '<unknown-type>'
if detail is None:
detail = ''
validation_error_format = typedef.get('validation-error-format',
'Invalid %(typedef)s: %(value)s; %(detail)s')
validation_error = (validation_error_format %
{'typedef': typedef_name, 'value': str(value), 'detail': detail})
raise error.ArgumentValidationError(validation_error, expected_tokens)
#
# COMMAND MODULE FUNCTIONS
#
def _get_args(item):
"""
Gets the args item from the given item. The item may be a command
or subcommand description or a nested arg list inside a 'choice'
argument. If there's no item names "args", then it returns None.
If the args item is a single argument, then it's converted into
a tuple containing that single argument, so that the caller can
always treat the return value as an iterable sequence of args.
It doesn't do any additional validation of the args value
(e.g. it doesn't check that the individual arguments are dict objects).
"""
args = item.get('args')
if args and not isinstance(args, collections.Sequence):
args = (args,)
return args
def _get_choice_args(choice):
if isinstance(choice, collections.Sequence):
args = choice
else:
args = _get_args(choice)
if not args:
args = (choice,)
return args
def _get_command_args_syntax_help_string(command, is_no_command, args):
"""
Get the syntax help string for an argument list.
"""
syntax_string = ''
if args:
for i, arg in enumerate(args):
if i > 0:
syntax_string += ' '
if _is_string(arg):
syntax_string += arg
continue
if type(arg) == tuple:
if sdnsh.description:
print _line(), command['self'], arg
if is_no_command:
optional = arg.get('optional-for-no', False)
else:
optional = arg.get('optional', False)
if optional:
syntax_string += '['
choices = arg.get('choices')
nested_args = arg.get('args')
if nested_args:
if choices:
raise error.CommandDescriptionError('An argument can\'t have both '
'"choices" and "args" attributes', command)
choices = (nested_args,)
if choices:
# Suppress choice delimiters if we've already emitted the square
# brackets to indicate an optional argument. This is so we get
# something simpler (e.g. "[this | that]" ) instead of getting
# doubled delimiters (e.g. "[{this | that}]").
if not optional:
syntax_string += '{'
for j, choice in enumerate(choices):
if j > 0:
syntax_string += ' | '
choice_args = _get_choice_args(choice)
choice_syntax_string = _get_command_args_syntax_help_string(command,
is_no_command,
choice_args)
syntax_string += choice_syntax_string
if not optional:
syntax_string += '}'
else:
field = arg.get('field')
tag = arg.get('tag')
if tag:
syntax_string += tag + ' '
token = arg.get('token')
if token:
syntax_string += token
if (field != None) and (arg.get('type') != 'boolean'):
help_name = arg.get('help-name')
if help_name:
help_name = '<' + help_name + '>'
else:
if arg.get('type') == 'enum':
values = arg.get('values')
if values:
if _is_string(values):
values = (values,)
help_name = ' | '.join(values)
if len(values) > 1:
help_name = '{' + help_name + '}'
if not help_name:
help_name = '<' + field + '>'
syntax_string += help_name
if optional:
syntax_string += ']'
return syntax_string
def _get_command_syntax_help_string(command, command_prefix_string):
try:
parts = []
if command_prefix_string:
parts.append(command_prefix_string)
command_name = _get_command_title(command)
parts.append(command_name)
args = _get_args(command)
args_syntax_help_string=''
if args:
args_syntax_help_string = _get_command_args_syntax_help_string(command, False, args)
parts.append(args_syntax_help_string)
name = ''
if sdnsh.debug or sdnsh.description:
name = command['self'] + "\n "
if is_no_command_supported(command) and command_prefix_string != 'no':
no_syntax = _get_command_args_syntax_help_string(command, True, args)
if ' '.join(no_syntax) == ' '.join(args_syntax_help_string):
command_syntax_help_string = name + '[no] ' +' '.join(parts)
else:
return [
name + ' '.join(parts),
name + 'no ' + command_name + ' ' + no_syntax
]
else:
command_syntax_help_string = name + ' '.join(parts)
except Exception, e:
if sdnsh.debug or sdnsh.debug_backtrace:
traceback.print_exc()
raise error.CommandDescriptionError(str(e))
return command_syntax_help_string
def _quick_command_syntax(command):
title = _get_command_title(command)
args = _get_args(command)
args_syntax_help_string = _get_command_args_syntax_help_string(command, False, args)
return ' '.join((title, args_syntax_help_string))
def reformat_line(string, width, indent, recurse):
result = []
split_chars = "0123456789"
depth = 0
align = 0
next_mark = False
if recurse > 8 or indent > width:
return [string]
string = ' ' * indent + string
for c in string:
if c in '{([<':
if align == 0:
align = len(result)
depth += 1
result.append(' ')
next_mark = False
elif c in '})]>':
next_mark = False
result.append(' ')
depth -= 1
if depth == 0:
next_mark = True
elif c == '|':
result.append(split_chars[depth])
next_mark = False
else:
if next_mark:
result.append(split_chars[depth])
else:
result.append(' ')
next_mark = False
if len(string) > width:
#split_depth = 0
r = ''.join(result)
splits = None
for s in range(0,9):
if split_chars[s] in r:
splits = ''.join(result).split(split_chars[s])
if len(splits[-1]) > 0:
break
if not splits:
return [string]
parts = []
first = len(splits[0])+1
f = ' ' * align
collect = string[0:len(splits[0])+1]
more = False
for s in splits[1:]:
if len(s) == 0:
continue
if len(collect) + len(s) > width:
parts.append(collect)
collect = f
more = False
# the next split can still be too wide
if len(collect) + len(s) > width:
reformat = reformat_line(string[first:first+len(s)+1], width, align, recurse+1)
first += len(s)+1
parts += reformat
collect = f
more = False
else:
collect += string[first:first+len(s)+1]
first += len(s)+1
more = True
if more:
parts.append(collect)
return parts
return [string]
def isCommandFeatureActive(command, features):
if not features:
return True # this command is usable anywhere
if features:
if not _is_list(features):
features = (features,)
every_features_enabled = True
for feature in features:
if not sdnsh.feature_enabled(feature):
every_features_enabled = False
break
if not every_features_enabled:
return False
return True
def is_no_command_supported(command):
"""
Predicate to provide some indication of whether or not
the command allows the 'no' prefix
"""
command_type = command.get('command-type')
if command_type:
if command_type in ['display-table','display-rest']:
return False
no_supported = command.get('no-supported', True)
if no_supported == False:
return False
return True
class CommandHandler(object):
def __init__(self):
self.actions = None
self.parse_errors = set()
self.prefix = ''
def handle_command_error(self, e):
pass
def handle_exception(self, e):
pass
def handle_unspecified_command(self):
pass
# These are the values to use for the priority argument to
# handle_parse_error
UNEXPECTED_ADDITIONAL_ARGUMENTS_PRIORITY = 10
UNEXPECTED_END_OF_ARGUMENTS_PRIORITY = 20
UNEXPECTED_TOKEN_PRIORITY = 30
VALIDATION_ERROR_PRIORITY = 40
def handle_parse_error(self, error_message, word_index,
priority, expected_tokens=None):
# Need to convert a list to a tuple first to make it hashable so that we
# can add it to the parse_errors set.
if isinstance(expected_tokens, list):
expected_tokens = tuple(expected_tokens)
self.parse_errors.add((error_message, word_index, priority, expected_tokens))
def handle_incomplete_command(self, arg, arg_scopes, arg_data,
fields, parsed_tag, command):
pass
def handle_complete_command(self, prefix_matches, scopes, arg_data,
fields, actions, command):
pass
def handle_command_results(self):
pass
def handle_first_matched_result(self, command):
pass
def handle_matched_result(self, command, result, arg_scopes):
pass
def get_default_value(self, arg):
default = None
if self.is_no_command:
default = arg.get('default-for-no')
if default is None and 'field' in arg:
default = mi.field_current_obj_type_default_value(arg['field'])
if default != None:
default = str(default)
if default is None:
default = arg.get('default')
return default
def get_matching_commands(self, command_word, is_no_command, command_list):
"""
Returns the list of matching command candidates from the command list.
To be a candidate a command description must satisfy three criteria:
1) If it specifies a feature (or a list of features), then all of
those features must be enabled.
2) Its 'mode' value (possibly a list of modes or a mode pattern)
must match the current mode.
3) It's name must begin with the command prefix
4) 'No' or not 'no', some commands allow 'no', other's dont
Each item in the candidate list is a 2 item tuple where the first item
is the command description and the second item is a boolean for whether
the command description was an exact match for the command word
(i.e. as opposed to a prefix match).
"""
candidates = []
current_mode = sdnsh.current_mode()
command_word_lower = command_word.lower()
try:
for command in command_list:
# If this command is tied to a feature then check that the
# feature is enabled.
if isCommandFeatureActive(command, command.get('feature')) == False:
continue
# Check that the command is enabled for the current mode
modes = command.get('mode')
if not modes:
raise error.CommandDescriptionError(
'Command description must specify a mode', command)
if not _is_list(modes):
modes = (modes,)
if not _match_current_modes(command, current_mode, modes):
continue
# If a 'no' command was requested, verify this command
# support 'no' (can we tell from the type?)
if is_no_command:
if not is_no_command_supported(command):
continue
# Check that the name matches the command word
name = command['name']
if _is_string(name):
name = name.lower()
if name.startswith(command_word_lower):
prefix_match = len(command_word) < len(name)
candidates.append((command, prefix_match))
elif isinstance(name, collections.Mapping):
# FIXME: Should support dict-based names that aren't
# patterns. Will be useful when we support lists of names
# for a command description where the arg_data can be set with
# different fields based on which name was matched for the command
if 're' not in name:
command['name']['re'] = re.compile(name['pattern'])
if name['re'].match(command_word):
candidates.append((command, True))
# FIXME: Probably should get rid of the following pattern code
# and just use the above pattern compilation mechanism.
# The following won't work when we require the command
# descriptions to be pure data, e.g. derived from JSON data
# or something like that.
elif type(name) == dict and \
name['re'].match(command_word):
candidates.append((command, False))
else:
raise error.CommandDescriptionError('Command description name '
'must be either string, dict, or pattern', command)
except Exception, _e:
if sdnsh.debug or sdnsh.debug_backtrace:
traceback.print_exc()
raise error.CommandDescriptionError('Missing mode or name:', command)
return candidates
def check_command_type_and_actions(self, current_scope, scopes, actions):
# Push the scopes for the command defaults and the command itself
# onto the scope stack.
command_type = current_scope.get('command-type')
command_defaults = command_type_defaults.get(command_type, {})
if command_defaults:
# Replace the command defaults with the new ones
scopes = scopes[:-1] + [command_defaults]
# Get the actions for the command
action_name = 'no-action' if self.is_no_command else 'action'
current_scope_actions = current_scope.get(action_name)
if current_scope_actions or command_defaults:
actions = _lookup_in_scopes(action_name, scopes)
if actions is None:
actions = []
elif not _is_list(actions):
actions = [actions]
# Update the actions with the scopes that were in effect at the point
# we looked up that action. This is so we can re-create the
# appropriate scopes later when we execute the action.
actions = [(action, scopes) for action in actions]
return scopes, actions
def parse_arguments(self, args, words, start_word_index, scopes,
arg_data, fields, actions, prefix_matches, command):
"""
Parse the given args from the given command words.
"""
if len(args) == 0:
return [[0, [], scopes, arg_data, fields, actions]]
parse_results = []
arg = args[0]
if _is_string(arg):
arg = {'token': arg}
remaining_args = args[1:]
arg_scopes = [arg] + scopes
arg_parse_results = []
# Get the attributes we need from the arg
# FIXME: Should possibly get rid of the 'data' mechanism
# and handle it via a custom dta handler
choices = arg.get('choices')
nested_args = arg.get('args')
if nested_args:
# Convert the nested argument list into a choices argument with
# a single choice, so that we can leverage the code below that
# handles choices
if choices:
raise error.CommandDescriptionError('An argument can\'t have both '
'"choices" and "args" attributes', command)
choices = (nested_args,)
# Determine whether or not this argument is optional.
# Default to making arguments optional for no commands, except for if
# it's a choices argument. In that case it will probably be ambiguous
# about which fields should be reset to the default values, so we just
# don't try to handle that.
optional_name = 'optional-for-no' if self.is_no_command else 'optional'
#optional_default_value = self.is_no_command
#optional_name = 'optional'
optional_default_value = False
#optional = arg.get(optional_name, optional_default_value)
# FIXME: Disabling the special handling of optional arguments for no
# command. That's causing spurious completions to be included. Not sure
# how to fix that right now. Do we really need the special optional
# handling anyway? Does Cisco actually support that.
# For example, being able to use "no ip address" rather than
# "no ip address 192.168.2.2 255.255.255.0". I haven't actually tried
# both forms on a Cisco switch to see what it does.
optional = arg.get(optional_name, optional_default_value)
# Check to see if this arg overrides either the command type or action
# Note that we don't want to set the "actions" variable with the
# updated actions yet until we know that the current argument
# actually matched against the command words and wasn't an optional
# argument that was skipped.
arg_scopes, arg_actions = self.check_command_type_and_actions(
arg, arg_scopes, actions)
if choices:
if not _is_list(choices):
raise error.CommandDescriptionError('"choices" argument must be a list '
'or tuple of argument descriptions from which to choose',
command)
for choice in choices:
choice_args = _get_choice_args(choice)
choice_arg_scopes = arg_scopes
choice_actions = list(arg_actions)
choice_prefix_matches = list(prefix_matches)
if isinstance(choice, collections.Mapping):
choice_arg_scopes = [choice] + choice_arg_scopes
choice_optional = choice.get(optional_name, False)
if choice_optional:
optional = True
choice_arg_scopes, choice_actions = \
self.check_command_type_and_actions(
choice, choice_arg_scopes, choice_actions)
choice_arg_data = dict(arg_data)
choice_fields = list(fields)
choice_parse_results = self.parse_arguments(choice_args,
words, start_word_index, choice_arg_scopes,
choice_arg_data, choice_fields, choice_actions,
choice_prefix_matches, command)
for choice_parse_result in choice_parse_results:
words_matched = choice_parse_result[0]
new_arg_data = choice_parse_result[3]
# FIXME: Not sure if the code below is the best way to
# handle things, but the idea is that we want to detect
# the case where any of the choices in a choice block
# is composed of all optional arguments. In that case
# the overall choice block thus becomes optional. The
# reason we propagate the optional attribute is that if
# there are multiple choices that consist entirely of
# optional arguments then we'd get mlutiple redundant
# matches with exactly the same arg_data and prefix_matches
# which would lead to an ambiguous command when we
# process the results at the end. So by not adding a
# result for each of those cases and instead just adding
# a single result for the overall choice block.
# The other thing we need to do is distinguish between
# optional args and default args which will both lead to
# cases where words_matched == 0. For the default arg
# case though we will add the match in the nested call
# since it will have changes to the arg_data which are
# significant in the processing of the command action.
# Since we've already added a result, we don't want to
# set the overall choice to be optional or else again
# we'll get multiple amibuous results. The way we detect
# that case is if the arg_data from the parse_result is
# different than the arg_data that was passed in. So
# that's why we use the following test.
if words_matched == 0 and new_arg_data == arg_data:
# FIXME: I don't think this will work correctly
# if/when we support default values for args. In that
# case the choice may have matched 0 words, but it
# may have updated the arg_data with some default
# argument values, which we'll if we don't add the
# parse_result at this point. Need to think more
# about this.
optional = True
else:
arg_parse_results.append(choice_parse_result)
else:
token = arg.get('token')
field = arg.get('field')
arg_type = arg.get('type')
tag = arg.get('tag')
default = self.get_default_value(arg)
tag_prefix_match = None
parsed_tag = False
is_match = True
words_matched = 0
results = None
# First try to parse the tag if there is one
if tag and len(words) > 0:
word = words[0]
if tag.lower().startswith(word.lower()):
if tag.lower() != word.lower():
tag_prefix_match = [start_word_index+words_matched, tag]
words_matched += 1
parsed_tag = True
else:
self.handle_parse_error("Unexpected argument at \"%s\"" % word,
start_word_index, CommandHandler.UNEXPECTED_TOKEN_PRIORITY, tag)
is_match = False
# Handle incomplete argument matching
if is_match:
if words_matched < len(words):
word = words[words_matched]
else:
self.handle_incomplete_command(arg, arg_scopes,
arg_data, fields, parsed_tag, command)
if default:
word = default
else:
self.handle_parse_error("Unexpected end of command",
start_word_index + words_matched,
CommandHandler.UNEXPECTED_END_OF_ARGUMENTS_PRIORITY)
is_match = False
# Handle the argument value
if is_match:
if token:
if token.lower().startswith(word.lower()):
value = True if arg_type == 'boolean' else token
results = [(value, token)]
else:
self.handle_parse_error(
"Unexpected argument at \"%s\"" % word,
start_word_index + words_matched,
CommandHandler.UNEXPECTED_TOKEN_PRIORITY, token)
is_match = False
else:
# Check that the argument is valid
try:
results = validate_argument(arg, word, arg_scopes, command)
except error.ArgumentValidationError, e:
expected_tokens = e.expected_tokens
if expected_tokens:
if _is_string(expected_tokens):
expected_tokens = (expected_tokens,)
self.handle_parse_error(str(e),
start_word_index + words_matched,
CommandHandler.UNEXPECTED_TOKEN_PRIORITY,
expected_tokens)
else:
self.handle_parse_error(str(e),
start_word_index + words_matched,
CommandHandler.VALIDATION_ERROR_PRIORITY)
is_match = False
if is_match:
assert results is not None
assert _is_list(results)
assert len(results) > 0
# If we reach here we've successfully matched the word. The word
# may have come from the commands words or it may have come from
# the default value for the argument. We only want to bump the
# words_matched in the former case, which is why we need to check
# against the length of the words array. Note that we don't want
# to bump words_matched in the code above where we get it from
# the command words, because then the word offset we pass to
# handle_parse_error would be off by 1 if the validation fails.
if words_matched < len(words):
words_matched += 1
data = arg.get('data')
arg_data_handler = _lookup_in_scopes('data-handler', arg_scopes)
self.handle_first_matched_result(command)
for result in results:
value, match_token = result
new_arg_data = dict(arg_data)
if data:
new_arg_data.update(data)
# XXX should the mode passed in here to the handler be
# the mode of the command, or the current mode ?
# (mode-of-the-command in case its a higher submode push)
if arg_data_handler:
invocation_scope = {
# FIXME: The 'name' attribute is deprecated. Remove once
# everything's been converted.
'name': field,
'field': field,
'value': value,
'data': new_arg_data,
'is-no-command': self.is_no_command,
'current-mode-obj-type': sdnsh.get_current_mode_obj_type(),
'current-mode-obj-id': sdnsh.get_current_mode_obj()
}
new_arg_scopes = [invocation_scope] + arg_scopes
try:
result = _call_proc(arg_data_handler,
argument_data_handler_registry, new_arg_scopes,
command)
except Exception, e:
# XXX ought to not manage parameter exceptions for _call_proc
if sdnsh.debug or sdnsh.debug_backtrace:
traceback.print_exc()
self.handle_parse_error(str(e),
start_word_index + words_matched,
CommandHandler.VALIDATION_ERROR_PRIORITY)
return parse_results
elif field is not None:
new_arg_data[field] = value
self.handle_matched_result(command, result, arg_scopes)
# FIXME: Do we still need the separate fields dict?
# If so, I don't think this is actually correct, since
# we want fields to not necessarily be kept exactly in
# sync with arg_data. Need to think about this more.
new_fields = new_arg_data.keys()
new_prefix_matches = list(prefix_matches)
if tag_prefix_match:
new_prefix_matches.append(tag_prefix_match)
if len(match_token) > len(word):
new_prefix_matches.append(
[start_word_index+words_matched-1, match_token])
arg_parse_results.append([words_matched, new_prefix_matches,
arg_scopes, new_arg_data, new_fields, arg_actions])
if optional:
arg_parse_results.append([0, prefix_matches, scopes,
arg_data, fields, actions])
for arg_parse_result in arg_parse_results:
(words_matched, prefix_matches, arg_scopes, arg_data,
fields, actions) = arg_parse_result
remaining_words = words[words_matched:]
remaining_parse_results = self.parse_arguments(
remaining_args, remaining_words,
start_word_index + words_matched, scopes, arg_data,
fields, actions, prefix_matches, command)
# The first item in each tuple is the words consumed, but
# that's relative to the remaining args that we passed to
# it. For the parse results from this invocation of
# parse args we also need to include the counts of the args
# that we've already parsed plus the args that were parsed
# for the current choice.
for parse_result in remaining_parse_results:
parse_result[0] += words_matched
# parse_prefix_matches = parse_result[1]
# for match in parse_prefix_matches:
# match[0] += words_matched
parse_result[1] = prefix_matches + parse_result[1]
parse_results.append(parse_result)
return parse_results
def handle_one_command(self, command, prefix_match, words):
assert words is not None
assert len(words) > 0
assert command is not None
fields = {}
command_word = words[0]
command_name = command.get('name')
arg_data = dict(command.get('data', {}))
# The first two elements in the scopes are the command and the command
# defaults. The command defaults will be populated with the real
# defaults when we call check_command_defaults_and_actions
scopes = [command, {}]
actions = []
prefix_matches = []
if prefix_match:
prefix_matches.append([0, command_name])
if isinstance(command_name, collections.Mapping):
field = command_name['field']
if field:
fields[field] = True
arg_data[field] = command_word
elif _is_string(command_name):
assert command_name.lower().startswith(command_word.lower())
else:
raise error.CommandDescriptionError('Command name must be either'
'a string or a dict', command)
# We've checked that the first word matches what we expect from the
# command object and now we just care about the remaining words, so
# we strip off the initial command word. Note that we slice/copy
# the word list instead of deleting the first word in place, so that
# we don't affect the word list of the calling function.
words = words[1:]
scopes, actions = self.check_command_type_and_actions(command, scopes, actions)
# Parse the arguments
args = _get_args(command)
parse_results = self.parse_arguments(args, words, 1, scopes,
arg_data, fields, actions, prefix_matches, command)
if parse_results:
for parse_result in parse_results:
words_consumed, prefix_matches, scopes, arg_data, fields, actions = parse_result
if words_consumed == len(words):
self.handle_complete_command(prefix_matches, scopes, arg_data, fields, actions, command)
else:
word = words[words_consumed]
self.handle_parse_error(
'Unexpected additional arguments at \"%s"' % word,
words_consumed+1,
CommandHandler.UNEXPECTED_ADDITIONAL_ARGUMENTS_PRIORITY)
return None
def handle_command(self, words, text=None):
# Check if it's a no command and, if so, adjust the words
self.is_no_command = len(words) > 0 and words[0].lower() == 'no'
if self.is_no_command:
self.prefix = words[0]
words = words[1:]
self.words = words
self.text = text
result = None
try:
if len(words) > 0:
command_word = words[0]
matching_commands = self.get_matching_commands(command_word,
self.is_no_command,
command_registry)
for matching_command in matching_commands:
command, prefix_match = matching_command
self.handle_one_command(command, prefix_match, words)
else:
result = self.handle_unspecified_command()
result = self.handle_command_results()
except Exception, e:
if sdnsh.debug or sdnsh.debug_backtrace:
traceback.print_exc()
if isinstance(e, error.CommandError):
result = self.handle_command_error(e)
else:
result = self.handle_exception(e)
return result
class CommandExecutor(CommandHandler):
def __init__(self):
super(CommandExecutor, self).__init__()
self.matches = []
#def get_default_attribute_name(self):
# return 'default-for-no' if self.is_no_command else 'default'
def handle_unspecified_command(self):
raise error.CommandInvocationError('No command specified')
def handle_command_error(self, e):
result = sdnsh.error_msg(str(e))
return result
def handle_exception(self, e):
if sdnsh.debug or sdnsh.debug_backtrace:
traceback.print_exc()
raise e
def handle_complete_command(self, prefix_matches, scopes, arg_data, fields, actions, command):
self.matches.append((prefix_matches, scopes, arg_data, fields, actions, command))
def handle_command_results(self):
if len(self.matches) == 0:
# No matching command. We need to run through the parse
# errors we've collected to try to figure out the best error
# message to use. The simple heuristic we use is that we'll
# assume that errors that matched the most command words
# before hitting a parse error are the most relevant, so
# those are the ones we'll print. If it's a parse error
# because it didn't match a literal token (i.e. as opposed
# to a validation error), then we'll collect all of the
# valid tokens and print a single error message for all of
# them. Otherwise we'll print out all of the individual
# error message. This isn't great, because in a lot of
# cases there we'll be multiple error messages, which
# may be either conflicting or almost identical (identical
# messages are eliminated because we use a set when we
# collect the parse errors).
most_words_matched = -1
highest_priority = -1
accumulated_error_messages = []
accumulated_expected_tokens = []
for error_message, words_matched, priority, expected_tokens in self.parse_errors:
if words_matched > most_words_matched:
# We found an error that matched more words than
# the other errors we've seen so far, so reset
# the highest priority, so we'll start looking for
# the new highest priority at the current word match level.
# the variable we're using to collect the error
# messages and expected tokens
most_words_matched = words_matched
highest_priority = -1
if words_matched == most_words_matched:
if priority > highest_priority:
# We found a higher priority error, so clear the
# accumulated errors and expected tokens.
accumulated_error_messages = []
accumulated_expected_tokens = []
highest_priority = priority
if words_matched == most_words_matched and priority == highest_priority:
# Collect the error message and check to see if there's
# an expected token and if all of the errors at this level
# so far also had expected tokens. If there's any error
# that doesn't have an expected token, then we revert
# to the mode where we'll print all of the individual
# error messages.
if expected_tokens is not None:
if _is_string(expected_tokens):
expected_tokens = [expected_tokens]
elif isinstance(expected_tokens, tuple):
expected_tokens = list(expected_tokens)
accumulated_expected_tokens += expected_tokens
else:
accumulated_error_messages.append(error_message)
# We've collected the errors and the expected tokens. Now we
# just need to format the error message to use with the
# CommandSyntaxError exception.
error_message = ''
if accumulated_expected_tokens:
word = self.words[most_words_matched]
error_message += 'Unexpected argument \"%s\"; expected ' % word
expected_tokens = ['"' + expected_token + '"'
for expected_token in accumulated_expected_tokens]
if len(expected_tokens) > 1:
expected_tokens.sort()
error_message += 'one of '
error_message += '(' + ', '.join(expected_tokens) + ')'
for message in accumulated_error_messages:
if error_message:
error_message += '\n'
error_message += message
raise error.CommandSyntaxError(error_message)
# There may be multiple results. We need to figure out it there's
# an unambiguous match. The heuristic we use is to iterate over
# the command words and try to find a single result which has an
# exact match on the word while all the others are prefix matches.
# If there are multiple results that are exact matches, then we
# discard the other results that were prefix matches and continue
# with the next command word. If at an iteration all of the
# results are prefix matches for the current word, then we check
# all of the full tokens that were prefix matched by the command
# word. If all of the results have the same full token, then we
# continue. If there are multiple full tokens, then that's
# indicative of an ambiguous command, so we break out of the loop
# and raise an ambiguous command exception.
unambiguous_match = None
if len(self.matches) > 1:
narrowed_matches = self.matches
for i in range(len(self.words)):
exact_match_matches = []
prefix_match_matches = []
prefix_match_full_tokens = {}
for match in narrowed_matches:
prefix_matches = match[0]
for prefix_match in prefix_matches:
if prefix_match[0] == i:
prefix_match_matches.append(match)
if type(prefix_match[1]) == dict: # regular expression
# similar to _get_command_title()
full_token = prefix_match[1]['title']
else:
full_token = prefix_match[1].lower()
prefix_match_full_tokens[full_token] = None
break
else:
exact_match_matches.append(match)
if len(exact_match_matches) == 1:
unambiguous_match = exact_match_matches[0]
break
elif len(exact_match_matches) > 1:
narrowed_matches = exact_match_matches
else:
# No exact matches, check to see if the prefix matches
# are all prefixes for the same token. If so, then let
# the process continue with the next word in the command.
# Otherwise it's an ambiguous command.
assert len(prefix_match_matches) > 1
if len(prefix_match_full_tokens) > 1:
break
narrowed_matches = prefix_match_matches
else:
# If we reach the end of the words without either finding an
# unambiguous result or the word which was ambiguous then that
# means that multiple command descriptions (or multiple
# choices/path through a single command description) had the
# exact same match tokens, which means that there are
# conflicting command descriptions, which is a bug in the
# command descriptions.
if sdnsh.debug or sdnsh.debug_backtrace or sdnsh.description:
for n in narrowed_matches:
print _line(), n[1][0]['self']
raise error.CommandAmbiguousError('Multiple command description match')
if unambiguous_match is None:
assert prefix_match_full_tokens is not None
assert len(prefix_match_full_tokens) > 1
quoted_full_tokens = ['"' + full_token + '"'
for full_token in prefix_match_full_tokens]
# pylint: disable=W0631
raise error.CommandAmbiguousError(
'Ambiguous command word "%s"; matches [%s]' %
(self.words[i], ', '.join(quoted_full_tokens)))
else:
unambiguous_match = self.matches[0]
prefix_matches, scopes, arg_data, fields, actions, command = unambiguous_match
# XXX Possibly a better method of deciding when to loook deeper
# in the scopes would be a good idea.
if fields == None or len(fields) == 0:
fields = _lookup_in_scopes('fields', scopes)
invocation_scope = {
'data': arg_data,
'fields': fields,
'is-no-command': self.is_no_command,
'current-mode-obj-type': sdnsh.get_current_mode_obj_type(),
'current-mode-obj-id': sdnsh.get_current_mode_obj()
}
#scopes = [invocation_scope] + scopes
assert actions is not None
if len(actions) == 0:
raise error.CommandDescriptionError('No action specified', command)
#if valid_args:
# scopes = valid_args + scopes
if sdnsh.description: # debug details
print "Command Selected: %s" % command['self']
combined_result = None
for action in actions:
action_proc, action_scopes = action
scopes = [invocation_scope] + action_scopes
if sdnsh.description:
print 'Action: ', action_proc
result = _call_proc(action_proc, action_registry, scopes, command)
if result is not None:
combined_result = combined_result + result if combined_result else result
# Since a successful 'no' command may remove an object,
# after a no command complete's all its actions, verify
# the existance of every object in the mode stack.
# Pop any nested modes where the object is missing.
if self.is_no_command:
# starting at the most nested submode level, determine if the
# current submode object still exists
any_removed = False
for index in reversed(range(len(sdnsh.mode_stack))):
mode = sdnsh.mode_stack[index]
if mi.obj_type_in_use_as_related_config_type(mode.get('obj_type')):
if sdnsh.description:
print 'CONFIG TYPE:', \
mode.get('obj_type'), \
mi.obj_type_in_use_as_related_config_type(mode.get('obj_type'))
continue
if mode.get('obj_type') and mode.get('obj'):
try:
sdnsh.get_object_from_store(mode['obj_type'], mode['obj'])
except:
# pop this item
if sdnsh.description:
print 'NO LOST OBJECT ', mode.get('obj_type'), mode.get('obj')
del sdnsh.mode_stack[index]
any_removed = True
if any_removed:
sdnsh.warning("submode exited due to deleted object")
sdnsh.update_prompt()
return combined_result
class CommandCompleter(CommandHandler):
def __init__(self, meta_completion = False):
super(CommandCompleter, self).__init__()
self.meta_completion = meta_completion
self.completions = {}
def get_default_value(self, arg):
default = arg.get('default')
if self.is_no_command and default is None and 'field' in arg:
default = mi.field_current_obj_type_default_value(arg['field'])
if default != None:
default = str(None)
return default
def complete_help(self, arg_scopes):
"""
Select the most appropriate localized text to associate with
the completion-reason for some completion-text
"""
c_help = _lookup_in_scopes('conpletion-text', arg_scopes)
if c_help:
return c_help
c_help = _lookup_in_scopes('syntax-help', arg_scopes)
if c_help:
return c_help
c_help = _lookup_in_scopes('all-help', arg_scopes)
if c_help:
return '!' + c_help
c_help = _lookup_in_scopes('short-help', arg_scopes)
if c_help:
return c_help
return ' <help missing> %s' % _lookup_in_scopes('self', arg_scopes)
def completion_set(self, completions, text, reason):
"""
In situations where multiple commands match for some completion
text, the last command-iterated-over will populate the text for
the completion dictionary. Use the first character of the
completion string for this command to identify when the text
represents the most "base" variant of the command to display
for completion help text.
When the first character of the string is a '!', the printing
procedures must drop the first character.
"""
if completions == None:
completions = {text : reason}
return
current_text = completions.get(text)
# current_text could possibly be None from the lookup
if current_text != None and current_text[0] == '!':
return
completions[text] = reason
def command_help_text(self, command):
c_help = command.get('all-help')
if c_help:
if c_help[0] != '!':
return '!' + c_help
return c_help
c_help = command.get('short-help')
return c_help
def handle_unspecified_command(self):
candidates = self.get_matching_commands(self.text, self.is_no_command, command_registry)
for candidate in candidates:
self.completion_set(self.completions, candidate[0]['name'] + ' ',
self.command_help_text(candidate[0]))
def handle_incomplete_command(self, arg, arg_scopes, arg_data, fields,
parsed_tag, command):
completions = None
tag = arg.get('tag')
lower_text = self.text.lower()
if tag is not None and not parsed_tag:
if tag.lower().startswith(lower_text):
if completions == None:
completions = {}
completions[tag + ' '] = self.complete_help(arg_scopes)
else:
token = arg.get('token')
type_name = arg.get('type')
if type_name == None:
type_name = arg.get('base-type')
typedef = typedef_registry.get(type_name) if type_name else None
# See if this argument is an enumerated value. This can either
# be by a type/values specified inline in the argument or via
# an enum typedef. So first we check for those two cases and
# get the object that contains the values, either the argument
# itself or the typedef.
enum_obj = None
if token is not None:
if token.lower().startswith(lower_text):
if completions == None:
completions = {}
completions[token + ' '] = self.complete_help(arg_scopes)
elif type_name == 'enum':
enum_obj = arg
elif typedef:
# Note: This doesn't recursively check up the type hierarchy,
# but it doesn't seem like a valid use case to have more than
# one level of inheritance for enumerations, so it should be OK.
base_type = typedef.get('base-type')
if base_type == 'enum':
enum_obj = typedef
# If we have an enum object, then get the values and
# build the completion list
if enum_obj:
enum_values = enum_obj.get('values')
if enum_values:
if _is_string(enum_values):
enum_values = (enum_values,)
# enum_values may be either a array/tuple of strings or
# else a dict whose keys are the enum strings. In both
# cases we can use the for loop to iterate over the
# enum string names to be used for completion.
completions = {}
for enum_value in enum_values:
if enum_value.lower().startswith(lower_text):
completions[enum_value + ' '] = \
self.complete_help(arg_scopes)
if completions == None or len(completions) == 0:
# Check to see if there's a custom completion proc
completion_proc = _lookup_in_scopes('completion', arg_scopes)
if completion_proc:
completions = {}
curr_obj_type = sdnsh.get_current_mode_obj_type()
curr_obj_id = sdnsh.get_current_mode_obj()
command_mode = command.get('mode')
if command_mode and command_mode[-1] == '*':
command_mode = command_mode[:-1]
if command_mode and sdnsh.current_mode() != command_mode:
# this is completing a different item on the stack.
# XXX needs better api's here.
found_in_mode_stack = False
for x in sdnsh.mode_stack:
if x['mode_name'] == command_mode:
found_in_mode_stack = True
curr_obj_type = x['obj_type']
curr_obj_id = x['obj']
break
if not found_in_mode_stack:
raise error.CommandDescriptionError(
'Unable to find mode %s' % command_mode,
command)
invocation_scope = {
'words': self.words,
'text': self.text,
'data': arg_data,
'completions': completions,
'is-no-command': self.is_no_command,
'mode' : command_mode,
'current-mode-obj-type': curr_obj_type,
'current-mode-obj-id': curr_obj_id,
}
arg_scopes.insert(0, invocation_scope)
if sdnsh.description: # for deubugging, print command name
print command['self'], completion_proc
try:
_result = _call_proc(completion_proc, completion_registry,
arg_scopes, command)
except TypeError, e:
if sdnsh.debug or sdnsh.debug_backtrace:
print "Issue: ", e
traceback.print_exc()
raise error.CommandDescriptionError("Unable to call completion",
command)
if (completions == None or len(completions) == 0 or self.meta_completion) and not self.text:
# TODO: It's not clear in this case how to detect if a
# partially entered argument can really match this argument,
# so we (conservatively) only include these completion text
# strings when there's no partial text for the argument, which
# is why we have the above check for self.text
# Note: We want to allow None for the value of completion-text
# to mean that completion should be disabled for the argument,
# so we use False for the default value of the get method.
# Then if the completion-text is not set in the arg the
# "if not completion_text" checks will be True, so we'll try to
# set the completion text to the help-name or field value.
completion_text = arg.get('completion-text', False)
if completion_text is not None:
# FIXME: Probably shouldn't use syntax help here. That
# should probably be reserved for printing additional
# syntax help for the argument, duh (presumably some longer
# descriptive text for the argument).
if not completion_text:
completion_text = arg.get('help-name')
if not completion_text:
completion_text = tag if tag else arg.get('field')
syntax_help = arg.get('syntax-help')
if syntax_help == None:
if sdnsh.description:
syntax_help = 'Add syntax-help %s at %s' % (command['self'],
completion_text)
# exclude enum, since the values will included as text
if typedef and type_name != 'enum':
syntax_help = typedef.get('help-name')
if syntax_help == None:
syntax_help = ' <' + type_name + '> ' # add delim
if completion_text and syntax_help:
#and completion_text.startswith(lower_text):
completion_text = ' <' + completion_text + '> ' # add_delim
if completions == None:
completions = {}
# The use of tuple here as he index provides a method
# of identifying the completion values which are
# 'meta-information', and not to be completed against
completions[(completion_text, syntax_help)] = syntax_help
if completions and len(completions) > 0:
for (completion, text) in completions.items():
self.completion_set(self.completions, completion, text)
return None
def handle_complete_command(self, prefix_matches, scopes,
arg_data, fields, actions, command):
if not self.text:
# complete command, intentionally use the short-help associated
# with the top level of this command.
# The use of tuple here as he index provides a method
# of identifying the completion values which are
# 'meta-information', and not to be completed against
self.completions[(' <cr> ', '<cr>' )] = command.get('short-help')
def handle_command_error(self, e):
# For completion ignore all errors exception for ones that are
# indicative of an error in the command description, which will
# (hopefully) be found by the developer while debugging the
# command description.
if isinstance(e, error.CommandDescriptionError):
sdnsh.print_completion_help(sdnsh.error_msg(str(e)))
if sdnsh.debug or sdnsh.debug_backtrace:
traceback.print_exc()
def print_completions(self, completions):
"""
Print the completions ourselves in sorted multiple column format.
We use this when the completions are are pseudo/help completions
that aren't real tokens that we want readline to use for
completions.
"""
meta_completions = [completion[0] if isinstance(completion, tuple)
else completion for completion in completions]
meta_completions.sort()
sdnsh.print_completion_help(sdnsh.choices_text_builder(meta_completions))
def handle_command_results(self):
if self.meta_completion == True:
# two-column display mode
sdnsh.completion_print = False
if len(self.completions):
max_token_len = 0
for completion in self.completions:
item = completion
if isinstance(completion, tuple):
item = completion[0]
max_token_len = max(max_token_len, len(item))
text = []
for completion in sorted(self.completions.keys(),
cmp=utif.completion_trailing_integer_cmp):
help = self.completions[completion]
if help and help[0] == '!':
help = help[1:]
left_side = completion
if isinstance(completion, tuple):
left_side = completion[0]
text.append('%-*s %s\n' % (max_token_len, left_side.strip(), help.strip()))
sdnsh.print_completion_help(''.join(text))
completions = list(self.completions)
if len(completions) == 1:
completion = completions[0]
if isinstance(completion, tuple):
help_text = completion[1]
if not help_text:
help_text = completion[0]
if self.meta_completion == False:
sdnsh.print_completion_help(help_text)
completions = None
elif len(completions) > 1:
# Some of the completion results are really pseudo completions
# that are meant to be more like a help prompt for the user rather
# than an actual token that should be used for completion by readline
# If all of the completions are like that and we return them to
# readline, then it will actually complete the leading prefix, which
# will be at least the leading '<'. To avoid that situation we print
# out the completions ourselves using print_completion_help and then
# return None for the completions.
all_pseudo_completions = True
for completion in completions:
name = completion[0] if isinstance(completion, tuple) else completion
if not name.endswith('> ') and not name.endswith('>'):
all_pseudo_completions = False
break
if all_pseudo_completions:
if self.meta_completion == False:
self.print_completions(completions)
completions = None
else:
completions = [completion[0] if isinstance(completion, tuple)
else completion for completion in completions]
return completions
def do_command_completion(words, text):
completer = CommandCompleter()
completions = completer.handle_command(words, text)
return completions
def do_command_completion_help(words, text):
completer = CommandCompleter(meta_completion = True)
completions = completer.handle_command(words, text)
return completions
def _canonicalize_validation_result(result):
"""
The canonical (i.e. most general) representation of a validation
result is a list of 2-item tuples where the first item in the tuple
is the validated value that should be used to update the arg_data
and the second item is the actual token that was matched, which may
not be the same as the word in the command. For example, in the
results for an enum the word from the command may be a prefix for
one of the allowed value for the enum. In that case the first item
would be the return value for the enum value and the second item
will be the full enum value, i.e. not the prefix that was typed
in the command.
But there are a couple of simpler representations of the validation
results. First, the result can be a simple scalar validated value.
This is converted to the canonical representation by turning it
into [(<value>, <value>)]. Second, the input result can be just
the 2-item tuple. To canonicalize that we just enclose it in a
list: [<tuple-result>].
"""
if result is None:
result = []
elif _is_list(result):
if len(result) > 0 and not _is_list(result[0]):
result = [result]
else:
matching_token = result if _is_string(result) else str(result)
result = [(result, matching_token)]
return result
def _combine_validation_results(result1, result2, typedef, value):
if result1 is not None:
result1 = _canonicalize_validation_result(result1)
if result2 is not None:
result2 = _canonicalize_validation_result(result2)
if result1 is not None:
if result2 is not None:
result = [r for r in result1 if r in result2]
if len(result) == 0:
_raise_argument_validation_exception(typedef, value, "no match")
#elif len(result) == 1:
# result = result[0]
return result
else:
return result1
elif result2 is not None:
return result2
else:
return [(value, value)]
def validate_type(type_name, value, scopes, command):
"""
Validate that the specified value matches the validation in the
specified type definition and any inherited type definitions
"""
# Look up the type definition and perform any validation specified there
typedef = typedef_registry.get(type_name)
if not typedef:
raise error.CommandDescriptionError('Unknown type: %s' % type_name)
type_result = None
validation_result = None
# If it's a subtype, validate the base type first
base_type_name = typedef.get('base-type')
if base_type_name:
type_result = validate_type(base_type_name, value, scopes, command)
# FIXME: Seems like we shouldn't be handling obj-type here, since
# that's just an attribute that's specific to certain validation
# procs. Shouldn't that value already be available in the scopes
# to be able to pass to the proc?
obj_type = _lookup_in_scopes('obj-type', scopes)
# Now validate the subtype
# _call_proc requires that the first scope be an invocation scope that
# it possibly modifies by settings the 'scopes' variable, so we need
# to include an empty invocation scope here
invocation_scope = {}
parameter_scope = {'typedef': typedef, 'value': value}
if obj_type:
parameter_scope['obj_type'] = obj_type
scopes = [invocation_scope, parameter_scope, typedef]
while base_type_name is not None:
base_typedef = typedef_registry.get(base_type_name)
if not base_typedef:
raise error.CommandDescriptionError('Invalid base-type name: %s' % base_type_name)
scopes.append(base_typedef)
base_type_name = base_typedef.get('base-type')
#command_type = command.get('command-type')
#command_defaults = command_type_defaults.get(command_type, {})
#scopes.append([command, command_defaults])
validation = _lookup_in_scopes('validation', scopes)
if validation:
validation_result = _call_proc(validation, validation_registry,
scopes, command)
if validation_result is None:
validation_result = value
result = _combine_validation_results(type_result, validation_result,
typedef, value)
return result
def validate_argument(arg, value, scopes, command):
type_result = None
validation_result = None
# Validate based on the type of the argument
type_name = arg.get('type')
if type_name:
if type_name == 'enum':
#values = arg.get('values')
#result = validate_enum({'values': values}, value)
type_result = c_validations.validate_enum(arg, value)
else:
type_result = validate_type(type_name, value, scopes, command)
else:
# If the argument description specifies a base-type value, then
# we treat it as an anonymous typedef.
# FIXME: Should probably be able to refactor this code a bit to
# reduce code duplication.
base_type_name = arg.get('base-type')
if base_type_name:
base_typedef = typedef_registry.get(base_type_name)
if base_typedef:
validation = _lookup_typedef_value(base_typedef, 'validation')
if validation:
scopes = [{'typedef': arg, 'value': value}] + scopes
type_result = _call_proc(validation, validation_registry,
scopes, command)
if type_result is None:
type_result = value
# Perform any custom validate proc in the argument
validation = arg.get('validation')
if validation:
# Include 'typedef' in the local scope so that we can use
# typedef validation functions inline for arguments (as opposed to
# requiring a typedef definition).
scopes = [{'arg': arg, 'typedef': arg, 'value': value}] + scopes
validation_result = _call_proc(validation, validation_registry,
scopes, command)
if validation_result is None:
validation_result = value
result = _combine_validation_results(type_result, validation_result,
arg, value)
return result
#
# CommandSyntaxer: Collect all the commands which "prefix match" the words
#
class CommandSyntaxer(CommandHandler):
def __init__(self, header):
super(CommandSyntaxer, self).__init__()
if header is None:
header = ''
self.header = header
self.is_no_command = False
self.commands = []
def add_known_command(self, command):
self.commands.append(command)
def handle_incomplete_command(self, arg, arg_scopes, arg_data, fields, parsed_tag, command):
if command not in self.commands:
self.commands.append(command)
def handle_complete_command(self, prefix_matches, scopes, arg_data, fields, actions, command):
if command not in self.commands:
self.commands.append(command)
def handle_command_results(self):
if len(self.commands) == 0:
return 'No applicable command: %s' % ' '.join(self.words)
help_lines = ['']
if self.header:
help_lines.append(self.header)
help_lines.append('')
help_strings = []
for command in self.commands:
command_help_string = _get_command_syntax_help_string(
command, self.prefix)
if type(command_help_string) == list:
help_strings += command_help_string
else:
help_strings.append(command_help_string)
(term_cols, term_rows) = sdnsh.pp.get_terminal_size()
for h in sorted(help_strings):
# use 75% of the width, since reformat_line isn't perfect.
smaller_term_cols = (term_cols * 3) / 4
if len(h) > smaller_term_cols:
help_lines += reformat_line(h, smaller_term_cols, 0, 0)
else:
help_lines.append(h)
if len(self.commands) == 1:
short_help = None
if self.is_no_command:
#
# Cobble together a help string for a no command, build
# a default value for subcommand's, possibly other command
# descriptions
#
short_help = self.commands[0].get('no-help')
if not short_help:
command_type = self.commands[0].get('command-type')
action = self.commands[0].get('action')
if ((command_type and command_type == 'config-submode') or
(action and action == 'push-mode-stack')):
mode_name = self.commands[0].get('name')
short_help = 'Remove %s configuration' % mode_name
else:
short_help = self.commands[0].get('short-help')
if short_help:
#help_lines.append('')
help_lines.append(short_help)
return '\n'.join(help_lines)
def get_command_syntax_help(words, header=None):
handler = CommandSyntaxer(header)
# Hack to make "help no" work
if len(words) == 1 and words[0] == 'no':
words = words + ['']
result = handler.handle_command(words)
return result
def get_command_short_help_for_pattern(word):
# try to find the command based on the pattern.
for command in command_registry:
if type(command['name']) == dict:
if command['name']['title'] == word:
return command.get('short-help')
def get_command_short_help(word):
handler = CommandHandler()
matching_commands = handler.get_matching_commands(word, False,
command_registry)
if len(matching_commands) == 0:
return get_command_short_help_for_pattern(word)
if len(matching_commands) >= 1:
# This will only work once all commands have command descriptions.
#matching_commands[0][0]['self'], _exact_mode_match(current_mode,
#(matching_commands[0][0]).get('mode'))
return (matching_commands[0][0]).get('short-help')
return None
def dumb_formatter(out, text, left, right = None):
"""
Build output lines from the passed in text. If the
text has leading spaces, then leave the spaces intact
(starting at the indent).
"""
if right == None:
(right, line_length) = sdnsh.pp.get_terminal_size()
if right - 20 > left: # XXX needs work
right = right - 20
right = min(right, 120)
left_indent = ' ' * left
out_len = left
out_line = left_indent
for line in text.split('\n'):
if len(line) == 0:
if out_len > left:
out.append(out_line)
out_len = left
out_line = left_indent
out.append('')
elif line[0] == ' ': # leading spaces
if out_len > left:
out.append(out_line)
out_len = left
out_line = left_indent
out.append( left_indent + line )
else: # text formatting
for word in line.split():
sep = ' '
if word.endswith('.'):
sep = ' '
if len(word) + out_len + len(sep) > right:
if out_len > left:
out.append(out_line)
out_len = left + len(word) + len(sep)
out_line = left_indent + word + sep
else:
out_line += word + sep
out_len += len(sep) + len(word)
if out_len > left:
out.append(out_line)
#
# Documentations: Collect all the commands which "prefix match" the words
#
class CommandDocumentor(CommandHandler):
def __init__(self, header, format = None):
super(CommandDocumentor, self).__init__()
if header is None:
header = ''
self.header = header
self.commands = []
self.docs = {}
self.docs_stack = []
self.is_no_command = False
self.words = None
self.format = format
(self.term_cols, self.term_rows) = sdnsh.pp.get_terminal_size()
def add_docs(self, name, value):
if name in sdnsh.reserved_words and value == None:
value = 'reserved|%s' % name
if name in self.docs:
if value != self.docs[name]:
if value != None and None in self.docs[name] and \
len(self.docs[name]) == 1 :
self.docs[name] = [value]
elif value != None and value not in self.docs[name]:
self.docs[name].append(value)
# if the value is already there, don't do nuttn
else:
self.docs[name] = [value]
def add_dict_doc(self, arg):
if 'choices' in arg:
self.add_tuple_docs(arg['choices'])
return
if 'args' in arg:
args_arg = arg['args']
if type(args_arg) == tuple:
self.add_tuple_docs(args_arg)
elif type(args_arg) == dict:
self.add_dict_doc(args_arg)
else:
print 'add_dict_doc ', type(args_arg), args_arg
return
tag = arg.get('tag')
if 'field' in arg:
if tag:
name = '%s <%s>' % (tag, arg['field'])
else:
name = arg['field']
elif 'token' in arg:
name = arg['token']
doc = arg.get('doc')
if doc and 'type' in arg:
values = arg.get('values')
if arg['type'] == 'enum' and values:
if isinstance(values,str):
self.add_docs(values,doc)
else:
for v in values:
if doc and doc[-1] == '+':
self.add_docs(v, doc[:-1] + v)
else:
self.add_docs(v, doc)
return
if doc:
self.add_docs(name, doc)
def add_tuple_docs(self, tuple_arg):
for t in tuple_arg:
if type(t) == tuple:
self.add_tuple_docs(t)
elif type(t) == dict:
self.add_dict_doc(t)
def add_command(self, command):
self.commands.append(command)
args = command.get('args')
self.add_tuple_docs((args,))
def handle_first_matched_result(self, command):
self.docs_stack.append({})
def handle_matched_result(self, command, result, arg_scopes):
matched_doc = _lookup_in_scopes('doc', arg_scopes)
if matched_doc and matched_doc[-1] == '+':
item = result
if type(item) == tuple:
item = item[0]
matched_doc = matched_doc[:-1] + item
self.docs_stack[-1][command['self']] = (result, matched_doc)
def handle_incomplete_command(self, arg, arg_scopes, arg_data,
fields, parsed_tag, command):
tag = arg.get('tag')
doc = arg.get('doc')
doc_any = doc
if doc_any == None:
doc_any = _lookup_in_scopes('doc', arg_scopes)
# note: doc may still be None
token = arg.get('token')
type_name = arg.get('type')
if type_name == None:
type_name = arg.get('base-type')
typedef = typedef_registry.get(type_name) if type_name else None
field = arg.get('field')
help_name = arg.get('help-name', field)
field_doc = arg.get('completion-text', False)
if field_doc == None:
field_doc = arg.get('help-name')
if field_doc == None:
field_doc = arg.get('syntax-help')
def manage_enum(arg, doc):
enum_values = arg.get('values')
if _is_string(enum_values):
enum_values = [enum_values]
for enum in enum_values:
if doc and doc[-1] == '+':
self.add_docs(enum, doc[:-1] + enum)
else:
self.add_docs(enum, doc)
if token != None: # token is a loner.
self.add_docs(token.lower(), doc_any)
elif tag and field and field_doc:
self.add_docs(tag + ' <' + help_name + '>', doc if doc else field_doc)
elif field and typedef and typedef.get('name') == 'enum':
manage_enum(arg, doc)
elif field and doc:
self.add_docs(help_name, doc)
elif tag and field and typedef:
self.add_docs(tag + ' <' + help_name + '>', 'types|' + typedef.get('name'))
elif field and field_doc:
self.add_docs(help_name, field_doc)
elif typedef:
typedef_name = typedef.get('name').lower()
if typedef_name != 'enum':
if field:
# magic reference to doc: types/<typename>
self.add_docs(field, 'types|' + typedef.get('name'))
else:
self.add_docs(typedef.get('name').lower(), doc)
else:
manage_enum(arg, doc)
if command not in self.commands:
self.commands.append(command)
def handle_complete_command(self, prefix_matches, scopes, arg_data,
fields, actions, command):
# only provide help for exact command matches
if len(prefix_matches):
return
if command not in self.commands:
if sdnsh.description:
print 'COMPLETE COMMAND DOC', command['self'], arg_data, prefix_matches
self.commands.append(command)
def handle_command_results_wiki(self):
def handle_plaintext_wiki(inputstr):
inputstr=inputstr.replace("{","\{")
inputstr=inputstr.replace("}","\}")
inputstr=inputstr.replace("[","\[")
inputstr=inputstr.replace("]","\]")
inputstr=inputstr.replace("\\\\","<br>")
inputstr=inputstr.replace("\n\n", "\\@")
inputstr=inputstr.replace("\n"," ")
inputstr=inputstr.replace("\\@", "\n\n")
inputstr=inputstr.replace("<br>", "\n")
return inputstr
if len(self.commands) == 0:
if self.words:
return 'No applicable command: %s' % ' '.join(self.words)
else:
return '\nNo command for ' + self.header
help_lines=[]
help_lines.append('')
last_desc_name = None
last_syntax_name = None
# Keep track of paragraphs displayed to prevent repeated
# display of the same doc tags.
shown_items = []
for command in self.commands:
doc_tag = command.get('doc')
name = _get_command_title(command)
short_help = command.get('short-help')
if short_help:
if self.header:
help_lines.append('\nh2. %s Command' % name.capitalize())
help_lines.append('\nh4. %s' % short_help.capitalize())
cmdmode=command.get('mode')
if isinstance(cmdmode,list):
cmdmodestr=''
for cmdmodemem in cmdmode:
if cmdmodemem[-1]=='*':
cmdmodemem=cmdmodemem[:-1]
cmdmodestr= cmdmodestr +' ' + cmdmodemem
else:
cmdmodestr=cmdmode
if cmdmodestr[-1]=='*':
cmdmodestr=cmdmodestr[:-1]
help_lines.append('\n*Command Mode:* %s mode' % cmdmodestr)
last_syntax_name = None # display syntax header
help_strings = []
command_help_string = _get_command_syntax_help_string(
command, self.prefix)
help_strings.append(command_help_string)
for h in sorted(help_strings):
if isinstance(h,list):
h = handle_plaintext_wiki(h[0])
else:
h = handle_plaintext_wiki(h)
if last_syntax_name != name:
help_lines.append('\n*Command Syntax:* {{%s}}' % h)
last_syntax_name = name
last_desc_name = None # print description header
if doc_tag:
text = doc.get_text(self, doc_tag)
if text != '' and doc_tag not in shown_items:
shown_items.append(doc_tag)
if last_desc_name != name:
help_lines.append('\n*Command Description:*')
last_desc_name = name
text=handle_plaintext_wiki(text)
help_lines.append(text)
#dumb_formatter(help_lines, text, 3)
last_syntax_name = None # print 'Syntax' header
if len(self.commands) == 1:
short_help = None
if self.is_no_command:
#
# Cobble together a help string for a no command, build
# a default value for subcommand's, possibly other command
# descriptions
#
short_help = self.commands[0].get('no-help')
if not short_help:
command_type = self.commands[0].get('command-type')
action = self.commands[0].get('action')
if ((command_type and command_type == 'config-submode') or
(action and action == 'push-mode-stack')):
mode_name = self.commands[0].get('name')
short_help = 'Remove %s configuration' % mode_name
else:
short_help = self.commands[0].get('short-help')
for last_doc in self.docs_stack:
if command['self'] in last_doc:
(keyword, last_doc) = last_doc[command['self']]
text = doc.get_text(self, last_doc)
if type(keyword) == tuple:
keyword = keyword[0]
if sdnsh.description:
help_lines.append("\t%s: %s %s" %
(command['self'], keyword, last_doc))
if text != '' and last_doc not in shown_items:
shown_items.append(last_doc)
help_lines.append('\nKeyword %s Description:' %
keyword.capitalize())
text=handle_plaintext_wiki(text)
help_lines.append(text)
#dumb_formatter(help_lines, text, 4)
if len(self.docs) > 0:
help_lines.append('\n*Next Keyword Descriptions:*')
for (doc_name, doc_tags) in self.docs.items():
if len(doc_tags) == 1 and doc_tags[0] == None:
if sdnsh.description:
help_lines.append("\t%s: %s missing doc attribute" %
(command['self'], doc_name))
help_lines.append('* %s:' % doc_name)
elif len(doc_tags) == 1 and doc_tags[0].split('|')[0] == 'types' and \
not doc_name.startswith(doc_tags[0].split('|')[1]):
type_name = doc_tags[0].split('|')[1].capitalize()
help_lines.append(' %s: type %s' %
(doc_name, type_name))
else:
help_lines.append('* %s:' % doc_name)
for doc_tag in doc_tags:
if sdnsh.description:
help_lines.append("\t%s: %s %s" %
(command['self'], doc_name, doc_tag))
text = doc.get_text(self, doc_tag)
if text == '':
help_lines.append('')
if text != '' and doc_tag not in shown_items:
if len(doc_tags) > 1 and doc_tag.split('|')[0] == 'types':
type_name = doc_tag.split('|')[1].capitalize()
help_lines.append('\tType: %s' % type_name)
shown_items.append(doc_tag)
text=handle_plaintext_wiki(text)
help_lines.append(text)
#dumb_formatter(help_lines, text, 4)
doc_example = command.get('doc-example')
if doc_example:
text = doc.get_text(self, doc_example)
if text != '':
help_lines.append('\n*Command Examples:*')
help_lines.append('{noformat}')
help_lines.append(text)
#dumb_formatter(help_lines, text, 0)
help_lines.append('{noformat}')
return '\n'.join(help_lines)
def handle_command_results(self):
if len(self.commands) == 0:
if self.words:
return 'No applicable command: %s' % ' '.join(self.words)
else:
return '\nNo command for ' + self.header
help_lines = []
# Deal with too much output
if len(self.commands) > 10: # 10 is a magic number,
help_lines.append('Help: Too many commands match (%d), '
'Please choose more detail from item: ' %
len(self.commands))
help_lines.append(sdnsh.choices_text_builder(self.docs))
return '\n'.join(help_lines)
if self.header:
help_lines.append(self.header)
help_lines.append('')
(term_cols, term_rows) = sdnsh.pp.get_terminal_size()
last_desc_name = None
last_syntax_name = None
# all-help in intended to provide an overall short-help for
# a collection of similarly syntax'ed commands.
all_help = {}
for command in self.commands:
if command.get('all-help'):
all_help[command.get('name')] = command.get('all-help')
# when len(all_help) == 1, only ONE command collection group
# is getting described
if len(all_help) == 1:
name = all_help.keys()[0]
help_lines.append('%s Command: %s' %
(name.capitalize(), all_help[name].capitalize()))
all_help = True
else:
all_help = False
# Keep track of paragraphs displayed to prevent repeated
# display of the same doc tags.
shown_items = []
for command in self.commands:
doc_tag = command.get('doc')
name = _get_command_title(command)
short_help = command.get('short-help')
if not all_help and short_help:
help_lines.append('\n%s Command: %s\n' %
(name.capitalize(), short_help.capitalize()))
last_syntax_name = None # display syntax header
help_strings = []
command_help_string = _get_command_syntax_help_string(
command, self.prefix)
if type(command_help_string) == list:
help_strings += command_help_string
else:
help_strings.append(command_help_string)
for h in sorted(help_strings):
if last_syntax_name != name:
if len(self.commands) > 1:
help_lines.append(' Command Syntax:')
else:
help_lines.append('%s Command Syntax:' %
name.capitalize())
last_syntax_name = name
# use 75% of the width, since reformat_line isn't perfect.
smaller_term_cols = (self.term_cols * 3) / 4
if len(h) > smaller_term_cols:
help_lines += reformat_line(h, smaller_term_cols, 2, 0)
else:
help_lines.append(' %s' % h)
last_desc_name = None # print description header
if doc_tag:
text = doc.get_text(self, doc_tag)
if text != '' and doc_tag not in shown_items:
shown_items.append(doc_tag)
if last_desc_name != name:
if self.format == None:
help_lines.append('\n%s Command Description:' %
name.capitalize())
elif self.format == 'clidoc':
help_lines.append('\n Description:')
last_desc_name = name
dumb_formatter(help_lines, text, 4)
last_syntax_name = None # print 'Syntax' header
if len(self.commands) > 1:
for last_doc in self.docs_stack:
if command['self'] in last_doc:
(keyword, cmd_doc) = last_doc[command['self']]
text = doc.get_text(self, cmd_doc)
if text != '' and cmd_doc not in shown_items:
shown_items.append(cmd_doc)
if type(keyword) == tuple:
keyword = keyword[0]
help_lines.append('\nKeyword %s:' % keyword.capitalize())
dumb_formatter(help_lines, text, 4)
doc_example = command.get('doc-example')
if len(self.commands) > 1 and doc_example:
text = doc.get_text(self, doc_example)
if text != '' and doc_example not in shown_items:
help_lines.append('Examples:')
shown_items.append(doc_example)
dumb_formatter(help_lines, text, 4)
if len(self.commands) == 1:
short_help = None
if self.is_no_command:
#
# Cobble together a help string for a no command, build
# a default value for subcommand's, possibly other command
# descriptions
#
short_help = self.commands[0].get('no-help')
if not short_help:
command_type = self.commands[0].get('command-type')
action = self.commands[0].get('action')
if ((command_type and command_type == 'config-submode') or
(action and action == 'push-mode-stack')):
mode_name = self.commands[0].get('name')
short_help = 'Remove %s configuration' % mode_name
else:
short_help = self.commands[0].get('short-help')
for last_doc in self.docs_stack:
if command['self'] in last_doc:
(keyword, last_doc) = last_doc[command['self']]
text = doc.get_text(self, last_doc)
if type(keyword) == tuple:
keyword = keyword[0]
if sdnsh.description:
help_lines.append("\t%s: %s %s" %
(command['self'], keyword, last_doc))
if text != '' and last_doc not in shown_items:
shown_items.append(last_doc)
help_lines.append('\nKeyword %s Description:' %
keyword.capitalize())
dumb_formatter(help_lines, text, 4)
if len(self.docs) > 0:
if self.format == None:
indent = ' '
help_lines.append('\nNext Keyword Descriptions;')
elif self.format == 'clidoc':
indent = ' '
for (doc_name, doc_tags) in sorted(self.docs.items()):
if len(doc_tags) == 1 and doc_tags[0] == None:
if sdnsh.description:
help_lines.append("\t%s: %s missing doc attribute" %
(command['self'], doc_name))
help_lines.append('%s%s:' % (indent, doc_name))
elif len(doc_tags) == 1 and doc_tags[0].split('|')[0] == 'types' and \
not doc_name.startswith(doc_tags[0].split('|')[1]):
type_name = doc_tags[0].split('|')[1].capitalize()
help_lines.append('%s%s: type %s' %
(indent, doc_name, type_name))
else:
help_lines.append('%s%s:' % (indent, doc_name))
for doc_tag in doc_tags:
if sdnsh.description:
help_lines.append("\t%s: %s %s" %
(command['self'], doc_name, doc_tag))
text = doc.get_text(self, doc_tag)
if text == '':
help_lines.append('')
if text != '' and doc_tag not in shown_items:
if len(doc_tags) > 1 and doc_tag.split('|')[0] == 'types':
type_name = doc_tag.split('|')[1].capitalize()
help_lines.append('\tType: %s' % type_name)
shown_items.append(doc_tag)
dumb_formatter(help_lines, text, 8)
doc_example = command.get('doc-example')
if doc_example:
text = doc.get_text(self, doc_example)
if text != '':
help_lines.append('Examples:')
dumb_formatter(help_lines, text, 4)
if len(self.commands) > 1 and len(self.docs) > 0:
if self.format == None:
help_lines.append('\nNext Keyword Descriptions;')
for (doc_name, doc_tags) in sorted(self.docs.items()):
if len(doc_tags) == 1 and doc_tags[0] == None:
if sdnsh.description:
help_lines.append("\t%s: missing doc attribute" %
command['self'])
help_lines.append(' %s:' % doc_name)
elif len(doc_tags) == 1 and doc_tags[0].split('|')[0] == 'types' and \
not doc_name.startswith(doc_tags[0].split('|')[1]):
type_name = doc_tags[0].split('|')[1].capitalize()
help_lines.append(' %s: type %s' %
(doc_name, type_name))
else:
help_lines.append(' %s:' % doc_name)
for doc_tag in doc_tags:
if doc_tag:
if len(doc_tags) > 1 and doc_tag.split('|')[0] == 'types':
type_name = doc_tag.split('|')[1].capitalize()
help_lines.append('\tType: %s' % type_name)
if sdnsh.description:
help_lines.append("\t%s: %s %s" %
(command['self'], doc_name, doc_tag))
text = doc.get_text(self, doc_tag)
if text != '' and doc_tag not in shown_items:
shown_items.append(doc_tag)
dumb_formatter(help_lines, text, 8)
else:
help_lines.append('')
else:
help_lines.append('')
return '\n'.join(help_lines)
def get_command_doc_tag(word):
handler = CommandHandler()
matching_commands = handler.get_matching_commands(word, False,
command_registry)
if len(matching_commands) == 1:
return (matching_commands[0][0]).get('doc')
if len(matching_commands) > 1:
# error? retur value for multiple commands?
pass
return None
def get_command_documentation(words, header=None):
handler = CommandDocumentor(header)
# Hack to make "help no" work
if len(words) == 1 and words[0] == 'no':
words = words + ['']
result = handler.handle_command(words)
return result
def command_submode_dictionary(modes = None):
if modes == None:
modes = sdnsh.command_dict.keys() + sdnsh.command_nested_dict.keys()
#
# try to find all submode commands, build a dictionary
reached_modes = []
mode_nodes = {}
mode_entries = []
for c in command_registry:
c_type = c.get('command-type')
if c_type and c_type == 'config-submode':
from_mode = c.get('mode')
to_mode = c.get('submode-name')
reached_modes.append(to_mode)
if not from_mode in mode_nodes:
mode_nodes[from_mode] = []
mode_nodes[from_mode].append((to_mode, _get_command_title(c)))
mode_entries.append({'mode' : from_mode,
'submode' : to_mode,
'command' : _get_command_title(c)})
if sdnsh.description:
print ', '.join(mode_nodes)
print mode_nodes
if [x for x in modes if not x in reached_modes]:
print 'Missing ', [x for x in modes if not x in reached_modes]
return mode_entries
def get_clidoc(words):
if len(words):
handler = CommandDocumentor(header = None, format = 'clidoc')
for word in words:
for c in command_registry:
if word == c['self']:
handler.add_command(c)
result = handler.handle_command_results()
return result
else:
clidoc = []
def commands_for_mode(mode):
clidoc.append('\n\n\n============================= MODE ' + mode + '\n\n')
for c in command_registry:
if mode == c['mode']:
handler = CommandDocumentor(header = '\n\n\n ---- MODE ' + mode,
format = 'clidoc')
handler.add_command(c)
clidoc.append(handler.handle_command_results())
mode = 'login'
commands_for_mode(mode)
# select commands by submode.
mode_entries = command_submode_dictionary()
for mode_entry in mode_entries:
mode = mode_entry['submode']
if mode[-1] == '*':
mode = mode[:-1]
if mode == 'config-':
mode = 'config'
commands_for_mode(mode)
return ''.join(clidoc)
def get_cliwiki(words):
def wiki_special_character_handling(inputstr):
inputstr=inputstr.replace('{','\{')
inputstr=inputstr.replace('}','\}')
inputstr=inputstr.replace('[','\[')
inputstr=inputstr.replace(']','\]')
return inputstr
cliwikifile=open('cliwiki.txt', 'w')
if not cliwikifile:
print 'File creation failed \n'
return
cliwikifile.write('{toc:printable=true|style=disc|maxLevel=3|minLevel=1|class=bigpink|exclude=[1//2]|type=list|include=.*}')
#write the introduction part
introductionfile=open('documentation/en_US/introduction')
str1=introductionfile.read()
cliwikifile.write('\n')
cliwikifile.write(str1)
str1='\nh1. CLI Commands'
cliwikifile.write(str1)
#generate all commands in login/enable mode except show cmds
mode=['login','enable']
category=['show']
for c in command_registry:
name = c['name']
if name not in category and c['mode'] in mode:
category.append(name)
handler = CommandDocumentor(header = 'login')
for cc in command_registry:
if name==cc['name'] and cc['mode'] in mode:
handler.add_command(cc)
str1 = handler.handle_command_results_wiki()
str1= str1+ '\n'
cliwikifile.write(str1)
handler = CommandDocumentor(header = None)
#generate all configuration commands: exclude internal commands
mode=['config', 'config*']
category=['internal'] #prune out all internal commands
str1='\nh2. Configuration Commands'
cliwikifile.write(str1)
for c in command_registry:
name = c['name']
if name not in category and c['mode'] in mode:
category.append(name)
submode='config-' + name
submode1=c.get('submode-name')
if submode1 is not None:
if submode1.find(submode)==-1:
submode=submode1
if name=='vns-definition':
print submode
str1="%s" % name
str1=wiki_special_character_handling(str1)
str1="\nh3. %s Commands " % str1.capitalize()
cliwikifile.write(str1)
handler = CommandDocumentor(header = None)
handler.add_command(c)
str1 = handler.handle_command_results_wiki()
str1= str1+ '\n'
cliwikifile.write(str1)
for cc in command_registry:
cmdmode=cc['mode']
if (isinstance(cmdmode, str)):
if (cmdmode.find(submode)!=-1) and (cc['name']!='show'):
#special handling: prune vns command from tenant mode
if (name != 'tenant') or ((name == 'tenant') and cmdmode.find('config-tenant-vns')==-1 and cmdmode.find('config-tenant-def-vns')==-1):
handler = CommandDocumentor(header = None)
handler.add_command(cc)
str1 = handler.handle_command_results_wiki()
str1= str1+ '\n'
cliwikifile.write(str1)
#generate all show commands
name='show'
str1='\nh2. Show Commands'
cliwikifile.write(str1)
mode=['config-internal'] #prune out all internal commands
obj_type=[]
for c in command_registry:
if c['name']==name and c['mode'] not in mode:
obj=c.get('obj-type')
if obj and obj not in obj_type:
obj_type.append(obj)
str1="%s" % obj
str1=wiki_special_character_handling(str1)
str1="\nh3. Show %s Commands " % obj.capitalize()
cliwikifile.write(str1)
for cc in command_registry:
if name==cc['name'] and obj==cc.get('obj-type') and cc['mode'] not in mode:
handler = CommandDocumentor(header = None)
handler.add_command(cc)
str1 = handler.handle_command_results_wiki()
str1= str1+ '\n'
cliwikifile.write(str1)
cliwikifile.close()
print 'CLI reference wiki markup file is generated successfully at: cli/cliwiki.txt.\n '
#
# Lint: Verify requested commands
#
class CommandLint(CommandHandler):
def __init__(self):
super(CommandLint, self).__init__()
self.commands = []
def handle_incomplete_command(self, arg, arg_scopes, arg_data, fields, parsed_tag, command):
if command not in self.commands:
self.commands.append(command)
def handle_complete_command(self, prefix_matches, scopes, arg_data, fields, actions, command):
if command not in self.commands:
self.commands.append(command)
def lint_action_query_table(self, c_self, command, c_scope):
# look for items which are not parameters
attrs = ['proc', 'key', 'scoped', 'sort', 'crack', 'obj-type', 'proc']
for a in c_scope:
if not a in attrs:
print '%s: unknown attribute %s for query-table' % (c_self, a)
req = ['obj-type']
for r in req:
if not r in c_scope:
print '%s: missing required attribute %s for query-table' % (
c_self, r)
if 'obj-type' in c_scope:
if not mi.obj_type_exists(c_scope['obj-type']):
print '%s: query-table no such obj-type %s' % (
c_self, c_scope['obj-type'])
def lint_action_query_rest(self, c_self, command, c_scope):
# look for items which are not parameters
attrs = ['proc', 'url', 'rest-type', 'key', 'scoped', 'sort', 'append']
for a in c_scope:
if not a in attrs:
print '%s: unknown attribute %s for query-rest' % (c_self, a)
req = ['url']
for r in req:
if not r in c_scope:
print '%s: missing required attribute %s for query-rest' % (
c_self, r)
if 'append' in c_scope:
if type(c_scope['append']) != dict:
print '%s: append parameter in query_rest' \
'required to be a dict' % (c_self)
def lint_action_join_table(self, c_self, command, c_scope):
# look for items which are not parameters
attrs = ['proc', 'obj-type', 'key', 'key-value', 'add-field', 'join-field', 'crack']
for a in c_scope:
if not a in attrs:
print '%s: unknown attribute %s for join-table' % (c_self, a)
req = ['obj-type', 'key', 'join-field']
for r in req:
if not r in c_scope:
print '%s: missing required attribute %s for query-table' % (
c_self, r)
if 'obj-type' in c_scope:
if not mi.obj_type_exists(c_scope['obj-type']):
print '%s: join-table no such obj-type %s' % (
c_self, c_scope['obj-type'])
def lint_dict_action(self, c_self, command, c_action, c_scope):
if not 'proc' in c_action:
print '%s: "proc" expected for dict in action' % c_self
return
action = c_action['proc']
if not action in action_registry:
print '%s: action %s unknown' % (c_self, c_action)
if c_action == 'display-rest':
if not 'url' in c_action:
print '%s: missing "url" for display-rest' % c_self
if action == 'display-table':
if not 'obj-type' in c_action:
# could be in c_scope, or other nests.
print '%s: missing "obj-type" for display-table' % c_self
if action in ['query-table', 'query-table-append']:
self.lint_action_query_table(c_self, command, c_action)
if action in ['query-rest', 'query-table-rest']:
self.lint_action_query_rest(c_self, command, c_action)
if action == 'join-table':
self.lint_action_join_table(c_self, command, c_action)
if action == 'legacy-cli':
print '%s: LEGACY-CLI, consider reimplementation' % c_self
def lint_action(self, c_self, command, c_action, c_type, c_scope):
if type(c_action) == tuple:
for t in c_action:
if type(t) == list:
print '%s: LIST nost supported as a member of actions' % c_self
elif type(t) == dict:
self.lint_dict_action(c_self, command, t, c_scope)
if type(c_action) == dict:
self.lint_dict_action(c_self, command, c_action, c_scope)
if type(c_action) == str:
if not c_action in action_registry:
print '%s: action %s unknown' % (c_self, c_action)
if c_action == 'display-rest':
if not 'url' in c_scope:
print '%s: missing "url" for display-rest' % c_self
if c_action in ['query-table', 'query-table-append']:
self.lint_action_query_table(c_self, command, c_scope)
if c_action in ['query-rest', 'query-table-rest']:
self.lint_action_query_rest(c_self, command, c_scope)
if c_action == 'join-table':
self.lint_action_join_table(c_self, command, c_scope)
if c_action == 'legacy-cli':
print '%s: LEGACY-CLI, consider reimplementation' % c_self
def lint_choice(self, c_self, command, c_choice, c_type):
for choice in c_choice:
if type(choice) == tuple:
# in order collection of terms, these aren't choices, but
# here e can treat them that way
for t in choice:
if type(t) == list:
print '%s: LIST nost supported as a member of choices' % c_self
elif type(t) == dict:
self.lint_choice(c_self, command, (t,), c_type)
elif type(t) == str:
# token, ought to validate char's in token
pass
else:
print '%s: bad element type -> %s' % (c_self, type(t))
if 'command-type' in choice:
print '%s: CHOICE contains "command-type", only' \
' intended to be used at top level' % c_self
if not choice['command-type'] in command_type_defaults:
print '%s: missing command-type %s' % (c_self, choice['command-type'])
if 'action' in choice:
self.lint_action(c_self, command, choice['action'], c_type, choice)
if 'choices' in choice:
self.lint_choice(c_self, command, choice['choices'], c_type)
def lint_command(self,command):
c_self = command.get('self', 'UNKNOWN SELF NANE')
print command['self']
c_name = command.get('name', None)
if c_name == None:
print '%s: no name defined' % c_self
c_mode = command.get('mode', None)
if c_mode == None:
print '%s: no submode defined' % c_self
c_short_help = command.get('short-help', None)
if c_short_help == None:
print '%s: no short-help defined' % c_self
c_obj_type = command.get('obj-type', None)
c_current_mode_obj_id = command.get('current-mode-obj-id', None)
if 'current-mode-obj-id' in command:
if c_current_mode_obj_id == None:
print '%s: "current-mode-obj-id" not needed at top level' % c_self
c_parent_id = command.get('parent-id', None)
if 'parent-id' in command:
if c_parent_id == None:
print '%s: "parent-id" not needed at top level' % c_self
c_type = command.get('command-type', None)
c_args = command.get('args', None)
if c_args == None:
print '%s: no args defined' % c_args
c_action = command.get('action', None)
if c_action or c_type:
self.lint_action(c_self, command, c_action, c_type, command)
if type(c_args) == dict:
if 'action' in c_args:
self.lint_action(c_self, command, c_args['action'], c_type, c_args)
if 'choices' in c_args:
self.lint_choice(c_self, command, c_args['choices'], c_type)
if 'choice' in c_args:
print '%s: "choices" not "choice"' % c_self
elif type(c_args) == tuple or type(c_args) == list:
for a in c_args:
if 'action' in a:
self.lint_action(c_self, command, a['action'], c_type, a)
if 'choices' in a:
self.lint_choice(c_self, command, a['choices'], c_type)
if 'choice' in a:
print '%s: "choices" not "choice"' % c_self
if 'field' in a:
if c_obj_type:
if not mi.obj_type_has_field(c_obj_type, a['field']):
print '%s: %s MISSING FIELD %s' % (c_self,
c_obj_type, a['field'])
def handle_command_results(self):
if len(self.commands) == 0:
return 'No applicable command: %s' % ' '.join(self.words)
for command in self.commands:
self.lint_command(command)
return '--'
def lint_command(words):
lint = CommandLint()
if len(words) == 0:
for command in command_registry:
lint.lint_command(command)
else:
for command in command_registry:
if command['self'] in words:
lint.lint_command(command)
return ''
class CommandPermutor(CommandHandler):
def __init__(self, qualify = False):
"""
@param qualify Generate qualify version of the permutations
"""
super(CommandPermutor, self).__init__()
self.commands = []
self.collect = []
self.qualify = qualify
self.obj_type = None
self.obj_type_other = None
def collect_complete_command(self, line):
if self.qualify:
# collect together parts, when a token appears,
# replace the token with a procedure call.
if len(line) == 0:
return
new_line = ['"']
quoted = True
if line[0][0] == '+': # unusual
new_line = ['%s ' % line[0][1:]]
line = line[1:]
quoted = False
for item in line:
if quoted:
if item[0] == '+':
new_line.append('" + %s ' % item[1:])
quoted = False
else:
new_line.append('%s ' % item)
else:
if item[0] == '+':
new_line.append(' + " " + %s ' % item[1:])
quoted = False
else:
new_line.append('+ " %s ' % item)
quoted = True
if quoted:
new_line.append('"')
self.collect.append(''.join(new_line))
else:
self.collect.append(' '.join(line))
def field_to_generator(self, field):
"""
Convert the field name to a text field.
When 'qualify' is set, replace the field name
with a likely procedure to call instead
"""
if self.qualify:
# Many of the fields are actually fields in
# the named obj-type. Take a shot at seeing if that's
# the case, and call a sample value collector
if self.obj_type_other:
# These are typically an obj_type|field names.
parts = self.obj_type_other.split('|')
o_field = field
if len(parts) > 1:
o_field = parts[1]
# by using obj_type_has_model() instead of
# obj_type_exists(), 'curl(1)' can be used in
# the testing to find sample objects.
if mi.obj_type_has_model(parts[0]):
sample_obj_type = 'sample_obj_type'
if self.is_no_command:
sample_obj_type = 'sample_no_obj_type'
if mi.obj_type_has_field(parts[0], o_field):
return '+id.%s("%s", "%s")' \
% (sample_obj_type, parts[0], o_field)
python_field = field.replace("-", "_")
if self.obj_type:
sample_obj_type = 'sample_obj_type'
if self.is_no_command:
sample_obj_type = 'sample_no_obj_type'
if mi.obj_type_has_field(self.obj_type, field):
return '+id.%s("%s", "%s")' \
% (sample_obj_type, self.obj_type, field)
else:
return '+id.%s("%s")' % (python_field, self.obj_type)
else:
return '+id.%s()' % python_field
else:
return "<%s>" % field
def permute_item(self, item, line, next_args):
"""
Deals with a single dictionary item. This can be:
a choice, an arg, a token, or a field.
"""
if type(item) == str:
line.append(item)
self.permute_in_order(next_args[0], line, next_args[1:])
return
if self.is_no_command:
optional = item.get('optional-for-no', False)
else:
optional = item.get('optional', False)
skip = item.get('permute') # XXX permute vs qualify skip?
if skip != 'skip' and optional:
if len(next_args):
self.permute_in_order(None, line, next_args)
else:
self.collect_complete_command(line)
choices = item.get('choices')
if choices:
self.permute_choices(choices, list(line), next_args)
return
if skip == 'skip':
return
token = item.get('token')
tag = item.get('tag')
field = item.get('field')
args = item.get('args')
obj_type = item.get('obj-type')
if obj_type:
self.obj_type = obj_type
# must do this after the recursive 'optional' calls.
self.obj_type_other = item.get('other')
if args:
if type(args) == dict:
self.permute_item(args, line, next_args)
elif type(args) == tuple:
self.permute_in_order(args, list(line), next_args)
else:
print 'TYPE ARGS ', type(args)
return
elif token:
line.append(token)
elif field:
item_type = item.get('type')
if item_type == None:
item_type = item.get('base-type')
if item_type == 'enum':
values = item.get('values')
if values:
if tag:
line.append(tag)
# convert the values into a tuple of dicts,
# so that permute_choices can manage it
if type(values) == str:
values = [values]
choices = tuple([{'token' : x} for x in values])
self.permute_choices(choices, list(line), next_args)
return
else:
if tag:
line.append(tag)
line.append(self.field_to_generator(field))
#
# fall-through to this common code, which should only be
# reached when no other recursive descent is performed.
if len(next_args):
self.permute_in_order(None, line, next_args)
else:
self.collect_complete_command(line)
def permute_choices(self, choices, line, next_args):
"""
When a 'choices': is reached, enumerate all the choices,
and continue forward.
"""
for pick in choices:
if type(pick) == dict:
new_line = list(line)
self.permute_item(pick, new_line, next_args)
if next_args == None:
self.collect_complete_command(line)
elif type(pick) == tuple:
self.permute_in_order(pick, list(line), next_args)
else:
print "CHOICE? ", type(pick)
def permute_in_order(self, items, line, next_args):
if items == None and len(next_args):
items = next_args
next_args = []
if len(items) == 0:
# done
self.collect_complete_command(line)
return
# see if the item is optional, if so then don't add the item,
# and go to the next item in a recursive call
while len(items) and type(items[0]) == str:
line.append(items[0])
items = items[1:]
if len(items):
item = items[0]
if len(items) > 1:
if len(next_args) == 0:
next_args = items[1:]
else:
if type(items[1:]) == tuple:
next_args = list(items[1:]) + next_args
else:
next_args = items[1:] + next_args
else:
next_args = []
token = False
if type(item) == tuple:
self.permute_in_order(item, line, list(next_args))
elif type(item) == dict:
self.permute_item(item, list(line), list(next_args))
else:
print 'permute_in_order: need type', type(item)
else:
self.collect_complete_command(line)
return
def permute_command_common(self, command):
args = command.get('args')
if args == None:
if sdnsh.description:
print 'PERMUTE: missing args for %s' % command['self']
return
name = _get_command_title(command)
obj_type = command.get('obj-type')
if obj_type:
self.obj_type = obj_type
if type(command.get('name')) == dict:
if self.qualify:
name = self.field_to_generator(command['name']['field'])
if self.is_no_command:
line = ['no', name]
else:
line = [name]
if type(args) == tuple:
self.permute_in_order(args, line, [])
elif type(args) == dict:
self.permute_item(args, line, [])
else:
print 'PC ', type(args)
return '\n'.join(self.collect)
def permute_command(self, command):
print 'PERMUTE', command['self'], is_no_command_supported(command)
self.is_no_command = False
result = self.permute_command_common(command)
if is_no_command_supported(command):
# Now add the command for 'no'
self.is_no_command = True
result = self.permute_command_common(command)
return result
def handle_command_results(self):
collections = []
for command in self.commands:
collections.append(self.permute_command(command))
return '\n'.join(collections)
def handle_command(self, words):
for word in words:
for c in command_registry:
if word == c['self']:
return self.permute_command(c)
def permute_single_submode(submode_command, qualify):
"""
Permute command for a submode, takes the command, finds
all the related submodesmodes, permutes all the commands
in the submode, then a recursive call to any submodes
found within this one.
@param submode_command command dictionary of the submode command.
Can be set to None as an indication of "root"
@param qualify boolean, describes whether to generate "permute"
output or "qualify" output. Permute output is a human readable
command permutation, while "qualify" is intended for script building
"""
permuted = []
permute_submodes = []
if submode_command == None:
mode = 'login'
else:
# first, the submode. There ought to be only one permutation.
permute = CommandPermutor(qualify)
permuted.append(permute.permute_command(submode_command))
mode = submode_command.get('submode-name')
print 'SUBMODE PERMUTE', submode_command['self'], 'MODE', mode
def submode_match(want, cmd):
if type(cmd) == str:
cmd = [cmd]
if want.startswith('config'):
for c in cmd:
if c.endswith('*'):
c = c[:-1]
if c == 'config-':
c = 'config'
if want == c:
return True
else:
return False
for c in cmd:
if want == c:
return True
return False
# there's no command to enter login.
for c in command_registry:
if submode_match(mode, c['mode']):
# no submode commands.
if c.get('command-type') == 'config-submode':
print 'SUBMODE POSTPONE', c['self']
permute_submodes.append(c)
else:
permute = CommandPermutor(qualify)
permuted.append(permute.permute_command(c))
# now any submodes found
for c in permute_submodes:
permuted.append(permute_single_submode(c, qualify))
return '\n'.join(permuted)
def permute_command(words, qualify):
"""
Permute all the commands (with no parameters), or a
sigle command. The command is named via the name
of the dictionary, for example:
permute COMMAND_DESCRIPTION
"""
if len(words) == 0:
return permute_single_submode(None, qualify)
else:
permute = CommandPermutor(qualify)
return permute.handle_command(words)
def _is_string(arg):
"""
Returns whether or not the argument is a string (either a "str" or "unicode")
"""
return isinstance(arg, types.StringTypes)
def _is_list(arg):
"""
Returns whether or not the argument is a list. This means that it's an
instance of a sequence, but not including the string types
"""
return isinstance(arg, collections.Sequence) and not _is_string(arg)
def _lookup_in_scopes(name, scopes, default=None):
"""
Look up the given name in the given list of scopes.
Returns the default value if the name is not defined in any of the scopes.
"""
assert name
assert scopes is not None
# We iterate over the items in the scope (rather than using 'get')
# so we can do a case-insensitive lookup.
name_lower = name.lower()
for scope in scopes:
for key, value in scope.items():
if key.lower() == name_lower:
return value
return default
def _call_proc(proc_descriptor, proc_registry, scopes, command):
assert proc_descriptor is not None
assert proc_registry is not None
if isinstance(proc_descriptor, collections.Sequence) and not _is_string(proc_descriptor):
proc_descriptors = proc_descriptor
else:
proc_descriptors = (proc_descriptor,)
combined_result = None
saved_scopes = scopes
for proc_descriptor in proc_descriptors:
args_descriptor = None
scopes = saved_scopes
if isinstance(proc_descriptor, collections.Mapping):
# Look up the proc in the specified registry
proc_name = proc_descriptor.get('proc')
if not proc_name:
raise error.CommandDescriptionError('Unspecified proc name: ',
command)
if proc_descriptor.get('args') or proc_descriptor.get('kwargs'):
args_descriptor = proc_descriptor
scopes = [proc_descriptor] + scopes
else:
proc_name = proc_descriptor
# Push an empty scope on the scope stack to hold a variable for the scopes
# FIXME: This should be cleaned up when we change the calling conventions
# for procs.
scopes = [{}] + scopes
scopes[0]['scopes'] = scopes
proc_and_arg = proc_registry.get(proc_name)
if not proc_and_arg:
raise error.CommandDescriptionError('Unknown proc name: %s' % proc_name, command)
proc, default_args_descriptor = proc_and_arg
if not args_descriptor:
args_descriptor = default_args_descriptor
converted_args = []
converted_kwargs = {}
# Convert the positional args
args = _get_args(args_descriptor)
if args:
for arg in args:
if _is_string(arg) and len(arg) > 1 and arg[0] == '$':
name = arg[1:]
for scope in scopes:
if name in scope:
value = scope[name]
break
else:
# FIXME: Disabling treating args that can't be resolved as errors, so that
# they can instead be interpreted as just using the default value for the
#raise error.CommandDescriptionError('Invalid proc argument: %s ' %
# name, command)
continue
converted_args.append(value)
# Convert the keyword args
kwargs = args_descriptor.get('kwargs')
if kwargs:
for key, value in kwargs.iteritems():
if _is_string(value) and len(value) > 1 and value[0] == '$':
name = value[1:]
for scope in scopes:
if name in scope:
value = scope[name]
break
else:
# FIXME: Don't treat args that can't be resolved as errors, so that
# they can instead be interpreted as just using the default value for the
#raise error.CommandDescriptionError('Invalid proc argument: %s ' %
# name, command)
continue
converted_kwargs[key] = value
# Call the proc
# pylint: disable=W0142
result = proc(*converted_args, **converted_kwargs)
if result is not None:
combined_result = combined_result + result if combined_result else result
return combined_result
def _add_applicable_modes(command, mode_dict):
"""
Helper function for _get_applicable_modes to add any modes for this command
"""
feature = command.get('feature')
if feature:
if not sdnsh.feature_enabled(feature):
return
mode = command.get('mode')
if mode:
if type(mode) == list:
for m in mode:
mode_dict[m] = None
else:
mode_dict[mode] = None
def _get_applicable_modes(command):
"""
Returns a list of all of the modes that are specified for this command
"""
mode_dict = {}
_add_applicable_modes(command, mode_dict)
return mode_dict.keys()
def _exact_mode_match(current_mode, command_modes):
"""
Return True when this command_mode is an exact match
for this current mode (used to partition list of commands
into two groups, one exact level, and for for related mode
matches)
"""
if not type(command_modes) == list:
command_modes = [command_modes]
for mode in command_modes:
if mode == current_mode:
return True
if mode.endswith('*') and mode[:-1] == current_mode:
return True
return False
def _match_current_modes(command, current_mode, modes):
"""
Even when the current mode isn't in the list of modes,
there's a few modes in the mode list which are intended to
be a collection of modes, eg: "config*", and "config-*",
For any mode which ends in an ('*'), the current mode
will match when the current mode is prefix of the mode
(without the last '*' character)
'config*' is intended to match any config mode, while
'config-*' is intended to match any config submode.
"""
if current_mode in modes:
return True
#
# if the modes is enable, this works everywhere
#
if 'login' in modes:
return True
#
# if the modes is login, and the mode is anything but login,
# then this is true
#
if 'enable' in modes and current_mode != 'login':
return True
for mode in modes:
if mode.endswith('*') and current_mode.startswith(mode[:-1]):
return True
#if command.get('command-type') == 'config-submode':
# for mode in modes:
# if current_mode.startswith(mode):
# print "_match_current_modes: current command type is config-submode",current_mode,mode
# return True
return False
def _get_command_title(command):
if type(command['name']) == str:
return command['name']
if type(command['name']) == dict:
return command['name']['title']
else:
raise Exception("Command %s has unknown title" % command['name'])
def _lookup_command_candidates(command_prefix, command_list):
"""
Returns the list of command candidates from the given command list.
A candidate must have a 'mode' value that matches the current mode,
and its name must begin with the given command prefix.
"""
candidates = []
current_mode = sdnsh.current_mode()
try:
for command in command_list:
modes = _get_applicable_modes(command)
if _match_current_modes(command, current_mode, modes):
name = command['name']
if (type(name) == str and
name.startswith(command_prefix.lower())):
candidates.append(command)
# should check the type of command_prefix,
# and for str, ame.match(command_prefix):
if type(name) == dict:
if 're' not in name:
command['name']['re'] = re.compile(name['pattern'])
if name['re'].match(command_prefix):
candidates.append(command)
if type(name) == dict and \
name['re'](command_prefix):
candidates.append(command)
except Exception, _e:
raise error.CommandDescriptionError('Missing mode or name: ', command)
return candidates
def do_command(words):
executor = CommandExecutor()
result = executor.handle_command(words)
return result
def init_command(bs, modi):
# FIXME HACK: sdnsh global to access top-level CLI object
global sdnsh, mi
sdnsh = bs
mi = modi
c_actions.init_actions(bs, modi)
c_data_handlers.init_data_handlers(bs, modi)
c_completions.init_completions(bs, modi)
c_validations.init_validations(bs, modi)
add_command_type('config', {
'action': 'write-fields',
'no-action': 'reset-fields'
})
add_command_type('config-object', {
'action': 'write-object',
'no-action': 'delete-objects',
})
add_command_type('config-submode', {
'action': 'push-mode-stack',
#'no-action': 'delete-child-objects',
'no-action': 'delete-objects',
})
add_command_type('create-tunnel', {
'action': 'create-tunnel'
})
add_command_type('remove-tunnel', {
'action': 'remove-tunnel'
})
add_command_type('create-policy', {
'action': 'create-policy'
})
add_command_type('remove-policy', {
'action': 'remove-policy'
})
add_command_type('display-table', {
'action': 'display-table'
})
add_command_type('display-rest', {
'action': 'display-rest'
})
add_command_type('manage-alias', {
'action' : 'create-alias',
'no-action' : 'delete-alias',
})
add_command_type('manage-tag', {
'action' : 'create-tag',
'no-action' : 'delete-tag',
})
add_command_type('update-config', {
'action' : 'update-config',
'no-action' : 'update-config',
})
# Initialize typedefs
add_typedef({
'name': 'string',
'validation': 'validate-string'
})
add_typedef({
'name': 'integer',
'validation': 'validate-integer'
})
add_typedef({
'name': 'hex-or-decimal-integer',
'validation': 'validate-hex-or-dec-integer'
})
add_typedef({
'name': 'date',
'validation': 'validate-date'
})
add_typedef({
'name': 'duration',
'validation': 'validate-duration'
})
add_typedef({
'name': 'enum',
'validation': 'validate-enum'
})
add_typedef({
'name' : 'mac-address',
'help-name' : 'MAC Address',
'base-type' : 'string',
'validation' : 'validate-mac-address',
})
add_typedef({
'name' : 'host',
'help-name' : 'Host Alias or MAC Address',
'base-type' : 'string',
'validation' : 'validate-host',
})
add_typedef({
'name' : 'vlan',
'help-name' : 'Vlan',
'base-type' : 'integer',
'validation' : 'validate-integer',
})
add_typedef({
'name' : 'dpid',
'help-name' : 'switch id (8-hex bytes)',
'base-type' : 'string',
'validation' : 'validate-switch-dpid',
})
add_typedef({
'name' : 'ip-address',
'help-name' : 'IP address (dotted quad)',
'base-type' : 'string',
'pattern' : r'^(([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])$',
})
add_typedef({
'name' : 'ip-address-not-mask',
'help-name' : 'IP Address',
'base-type' : 'string',
'pattern' : r'^(\d{1,3}\.){3}\d{1,3}$',
'validation' : 'validate-ip-address-not-mask'
})
add_typedef({
'name' : 'cidr-range',
'help-name' : 'cidr range (ip-address/int)',
'base-type' : 'string',
'pattern' : r'^(\d{1,3}\.){3}\d{1,3}/\d{1,2}?$',
'validation' : 'validate-cidr-range',
})
add_typedef({
'name' : 'netmask',
'help-name' : 'netmask (eg: 255.255.255.0)',
'base-type' : 'ip-address',
'validation' : 'validate-netmask'
})
add_typedef({
'name' : 'obj-type',
'help-name' : 'configured object',
'base-type' : 'string',
'validation' : 'validate-existing-obj'
})
add_typedef({
'name' : 'inverse-netmask',
'help-name' : 'inverse netmask (eg: 0.0.0.255)',
'base-type' : 'ip-address',
'validation' : 'validate-inverse-netmask'
})
add_typedef({
'name' : 'domain-name',
'help-name' : 'Domain name',
# Simple domain name checking.
# Allows some things that aren't valid domain names, but doesn't
# disallow things liky punycode and fully qualified domain names.
'pattern' : r'^([a-zA-Z0-9-]+.?)+$',
'base-type' : 'string'
})
add_typedef({
'name': 'ip-address-or-domain-name',
'help-name': 'IP address or domain name',
'base-type': 'string',
'pattern': (
r'^([a-zA-Z0-9-]+.?)+$', # simple domain name
r'^(\d{1,3}\.){3}\d{1,3}$' # IP address
),
# for ip addresses, ought to validate non-mask values, ie: 0.0.0.0, 255.255.255.255
# ought to also validate ip addresses which aren't ip address, the dhcp 169.x values.
})
add_typedef({
'name' : 'resolvable-ip-address',
'help-name' : 'resolvable ip address',
'base-type' : 'string',
'pattern' : (
r'^([a-zA-Z0-9-]+.?)+$', # simple domain name
r'^(\d{1,3}\.){3}\d{1,3}$' # IP address
),
'validation' : 'validate-resolvable-ip-address',
})
add_typedef({
'name': 'enable-disable-flag',
'help-name': 'Enter "enable" or "disable"',
'base-type': 'enum',
'values': ('disable', 'enable'),
})
add_typedef({
'name' : 'identifier',
'help-name' : 'Alphabetic character, followed by alphanumerics',
'base-type' : 'string',
'validation' : 'validate-identifier',
})
add_typedef({
'name' : 'config',
'help-name' : 'name of config file',
'base-type' : 'string',
'validation' : 'validate-config',
})