[ONOS-7039] [ONOS-7044] Fix PEP8 Warnings in TestON
Change-Id: Ied79ff9caff5487a6df50466307f757468d7ca3a
diff --git a/TestON/bin/cli.py b/TestON/bin/cli.py
index a7e1297..5a0b3d5 100755
--- a/TestON/bin/cli.py
+++ b/TestON/bin/cli.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-'''
+"""
Created on 20-Dec-2012
Copyright 2012 Open Networking Foundation
@@ -10,7 +10,7 @@
TestON is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
- (at your option) any later version.
+ ( at your option ) any later version.
TestON is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -21,9 +21,7 @@
along with TestON. If not, see <http://www.gnu.org/licenses/>.
-'''
-
-
+"""
"""
cli will provide the CLI shell for teston framework.
@@ -35,7 +33,6 @@
teston> run test DpctlTest
Several useful commands are provided.
"""
-
from subprocess import call
from cmd import Cmd
from os import isatty
@@ -46,16 +43,18 @@
import threading
import __builtin__
import pprint
-dump = pprint.PrettyPrinter(indent=4)
+dump = pprint.PrettyPrinter( indent=4 )
__builtin__.testthread = False
introduction = "TestON is the testing framework \nDeveloped by Paxterra Solutions (www.paxterrasolutions.com)"
__builtin__.COLORS = False
-path = re.sub( "/bin$", "", sys.path[0] )
+path = re.sub( "/bin$", "", sys.path[ 0 ] )
sys.path.insert( 1, path )
from core.teston import *
-class CLI( threading.Thread,Cmd,object ):
+
+class CLI( threading.Thread, Cmd, object ):
+
"command-line interface to execute the test."
prompt = 'teston> '
@@ -64,7 +63,7 @@
self.teston = teston
self._mainevent = threading.Event()
- threading.Thread.__init__(self)
+ threading.Thread.__init__( self )
self.main_stop = False
self.locals = { 'test': teston }
self.stdin = stdin
@@ -86,69 +85,70 @@
Cmd.do_help( self, line )
if line is '':
output( self.helpStr )
- def do_run(self,args):
- '''
+
+ def do_run( self, args ):
+ """
run command will execute the test with following optional command line arguments
logdir <directory to store logs in>
testcases <list of testcases separated by comma or range of testcases separated by hypen>
mail <mail-id or list of mail-ids seperated by comma>
example 1, to execute the examples specified in the ~/examples diretory.
- '''
+ """
try:
args = args.split()
options = {}
- options = self.parseArgs(args,options)
- options = dictToObj(options)
+ options = self.parseArgs( args, options )
+ options = dictToObj( options )
if not testthread:
- test = TestThread(options)
+ test = TestThread( options )
test.start()
while test.isAlive():
- test.join(1)
+ test.join( 1 )
else:
- print main.TEST+ " test execution paused, please resume that before executing to another test"
+ print main.TEST + " test execution paused, please resume that before executing to another test"
except KeyboardInterrupt, SystemExit:
print "Interrupt called, Exiting."
test._Thread__stop()
main.cleanup()
main.exit()
- def do_resume(self, line):
- '''
+ def do_resume( self, line ):
+ """
resume command will continue the execution of paused test.
teston>resume
- [2013-01-07 23:03:44.640723] [PoxTest] [STEP] 1.1: Checking the host reachability using pingHost
+ [ 2013-01-07 23:03:44.640723 ] [ PoxTest ] [ STEP ] 1.1: Checking the host reachability using pingHost
2013-01-07 23:03:44,858 - PoxTest - INFO - Expected Prompt Found
....
- '''
+ """
if testthread:
testthread.play()
- else :
+ else:
print "There is no test to resume"
- def do_nextstep(self,line):
- '''
+ def do_nextstep( self, line ):
+ """
nextstep will execute the next-step of the paused test and
it will pause the test after finishing of step.
teston> nextstep
Will pause the test's execution, after completion of this step.....
- teston> [2013-01-07 21:24:26.286601] [PoxTest] [STEP] 1.8: Checking the host reachability using pingHost
+ teston> [ 2013-01-07 21:24:26.286601 ] [ PoxTest ] [ STEP ] 1.8: Checking the host reachability using pingHost
2013-01-07 21:24:26,455 - PoxTest - INFO - Expected Prompt Found
.....
teston>
- '''
+ """
if testthread:
- main.log.info("Executing the nextstep, Will pause test execution, after completion of the step")
+ main.log.info( "Executing the nextstep, Will pause test execution, after completion of the step" )
testthread.play()
- time.sleep(.1)
+ time.sleep( .1 )
testthread.pause()
else:
print "There is no paused test "
- def do_dumpvar(self,line):
- '''
+ def do_dumpvar( self, line ):
+ """
dumpvar will print all the test data in raw format.
usgae :
teston>dumpvar main
@@ -159,56 +159,54 @@
teston>dumpvar topology
here 'topology' will be topology specification of the test specified in topo file.
- '''
+ """
if testthread:
if line == "main":
- dump.pprint(vars(main))
- else :
- try :
- dump.pprint(vars(main)[line])
+ dump.pprint( vars( main ) )
+ else:
+ try:
+ dump.pprint( vars( main )[ line ] )
except KeyError as e:
print e
- else :
+ else:
print "There is no paused test "
- def do_currentcase(self,line):
- '''
+ def do_currentcase( self, line ):
+ """
currentcase will return the current case in the test execution.
teston>currentcase
Currently executing test case is: 2
- '''
+ """
if testthread:
- print "Currently executing test case is: "+str(main.CurrentTestCaseNumber)
- else :
+ print "Currently executing test case is: " + str( main.CurrentTestCaseNumber )
+ else:
print "There is no paused test "
-
- def do_currentstep(self,line):
- '''
+ def do_currentstep( self, line ):
+ """
currentstep will return the current step in the test execution.
teston>currentstep
Currently executing test step is: 2.3
- '''
+ """
if testthread:
- print "Currently executing test step is: "+str(main.CurrentTestCaseNumber)+'.'+str(main.stepCount)
- else :
+ print "Currently executing test step is: " + str( main.CurrentTestCaseNumber ) + '.' + str( main.stepCount )
+ else:
print "There is no paused test "
-
- def do_stop(self,line):
- '''
+ def do_stop( self, line ):
+ """
Will stop the paused test, if any !
- '''
+ """
if testthread:
testthread.stop()
return 'exited by user command'
- def do_gettest(self,line):
- '''
+ def do_gettest( self, line ):
+ """
gettest will return the test name which is under execution or recently executed.
Test under execution:
@@ -217,18 +215,18 @@
Test recently executed:
Recently executed test is: MininetTest
- '''
- try :
- if testthread :
- print "Currently executing Test is: "+main.TEST
- else :
- print "Recently executed test is: "+main.TEST
+ """
+ try:
+ if testthread:
+ print "Currently executing Test is: " + main.TEST
+ else:
+ print "Recently executed test is: " + main.TEST
except NameError:
print "There is no previously executed Test"
- def do_showlog(self,line):
- '''
+ def do_showlog( self, line ):
+ """
showlog will show the test's Log
teston>showlog
Last executed test's log is : //home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_42_11/PoxTest_07_Jan_2013_21_42_11.log
@@ -236,17 +234,17 @@
teston>showlog
Currently executing Test's log is: /home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_46_58/PoxTest_07_Jan_2013_21_46_58.log
.....
- '''
- try :
- if testthread :
- print "Currently executing Test's log is: "+main.LogFileName
+ """
+ try:
+ if testthread:
+ print "Currently executing Test's log is: " + main.LogFileName
- else :
- print "Last executed test's log is : "+main.LogFileName
+ else:
+ print "Last executed test's log is : " + main.LogFileName
logFile = main.LogFileName
- logFileHandler = open(logFile, 'r')
- for msg in logFileHandler.readlines() :
+ logFileHandler = open( logFile, 'r' )
+ for msg in logFileHandler.readlines():
print msg,
logFileHandler.close()
@@ -254,79 +252,77 @@
except NameError:
print "There is no previously executed Test"
-
-
- def parseArgs(self,args,options):
- '''
+ def parseArgs( self, args, options ):
+ """
This will parse the command line arguments.
- '''
- options = self.initOptions(options)
- try :
+ """
+ options = self.initOptions( options )
+ try:
index = 0
while index < len( args ):
- option = args[index]
- if index > 0 :
- if re.match("--params", option, flags=0):
+ option = args[ index ]
+ if index > 0:
+ if re.match( "--params", option, flags=0 ):
# check if there is a params
- options['params'].append(args[index+1])
- elif re.match("logdir|mail|example|testdir|testcases|onoscell", option, flags = 0):
- options[option] = args[index+1]
- options = self.testcasesInRange(index+1,option,args,options)
+ options[ 'params' ].append( args[ index + 1 ] )
+ elif re.match( "logdir|mail|example|testdir|testcases|onoscell", option, flags=0 ):
+ options[ option ] = args[ index + 1 ]
+ options = self.testcasesInRange( index + 1, option, args, options )
index += 2
- else :
- options['testname'] = option
+ else:
+ options[ 'testname' ] = option
index += 1
except IndexError as e:
- print (e)
+ print ( e )
main.cleanup()
main.exit()
return options
- def initOptions(self,options):
- '''
+ def initOptions( self, options ):
+ """
This will initialize the commandline options.
- '''
- options['logdir'] = None
- options['mail'] = None
- options['example'] = None
- options['testdir'] = None
- options['testcases'] = None
- options['onoscell'] = None
+ """
+ options[ 'logdir' ] = None
+ options[ 'mail' ] = None
+ options[ 'example' ] = None
+ options[ 'testdir' ] = None
+ options[ 'testcases' ] = None
+ options[ 'onoscell' ] = None
# init params as a empty list
- options['params'] = []
+ options[ 'params' ] = []
return options
- def testcasesInRange(self,index,option,args,options):
- '''
- This method will handle testcases list,specified in range [1-10].
- '''
- if re.match("testcases",option,1):
+ def testcasesInRange( self, index, option, args, options ):
+ """
+ This method will handle testcases list,specified in range [ 1-10 ].
+ """
+ if re.match( "testcases", option, 1 ):
testcases = []
- args[index] = re.sub("\[|\]","",args[index],0)
- m = re.match("(\d+)\-(\d+)",args[index],flags=0)
+ args[ index ] = re.sub( "\[|\]", "", args[ index ], 0 )
+ m = re.match( "(\d+)\-(\d+)", args[ index ], flags=0 )
if m:
- start_case = eval(m.group(1))
- end_case = eval(m.group(2))
- if (start_case <= end_case):
+ start_case = eval( m.group( 1 ) )
+ end_case = eval( m.group( 2 ) )
+ if ( start_case <= end_case ):
i = start_case
while i <= end_case:
- testcases.append(i)
- i= i+1
- else :
+ testcases.append( i )
+ i = i + 1
+ else:
print "Please specify testcases properly like 1-5"
- else :
- options[option] = args[index]
+ else:
+ options[ option ] = args[ index ]
return options
- options[option] = str(testcases)
+ options[ option ] = str( testcases )
return options
- def cmdloop(self, intro=introduction):
+ def cmdloop( self, intro=introduction ):
print introduction
while True:
try:
- super(CLI, self).cmdloop(intro="")
+ super( CLI, self ).cmdloop( intro="" )
self.postloop()
except KeyboardInterrupt:
if testthread:
@@ -336,69 +332,68 @@
sys.exit()
def do_echo( self, line ):
- '''
+ """
Echoing of given input.
- '''
- output(line)
+ """
+ output( line )
def do_sh( self, line ):
- '''
+ """
Run an external shell command
sh pwd
sh ifconfig etc.
- '''
+ """
call( line, shell=True )
-
def do_py( self, line ):
- '''
+ """
Evaluate a Python expression.
- py main.log.info("Sample Log Information")
+ py main.log.info( "Sample Log Information" )
2013-01-07 12:07:26,804 - PoxTest - INFO - Sample Log Information
- '''
+ """
try:
exec( line )
except Exception as e:
output( str( e ) + '\n' )
- def do_interpret(self,line):
- '''
+ def do_interpret( self, line ):
+ """
interpret will translate the single line openspeak statement to equivalent python script.
teston> interpret ASSERT result EQUALS main.TRUE ONPASS "Ping executed successfully" ONFAIL "Ping failed"
- utilities.assert_equals(expect=main.TRUE,actual=result,onpass="Ping executed successfully",onfail="Ping failed")
+ utilities.assert_equals( expect=main.TRUE,actual=result,onpass="Ping executed successfully",onfail="Ping failed" )
- '''
+ """
from core import openspeak
ospk = openspeak.OpenSpeak()
- try :
- translated_code = ospk.interpret(text=line)
+ try:
+ translated_code = ospk.interpret( text=line )
print translated_code
except AttributeError as e:
print 'Dynamic params are not allowed in single statement translations'
- def do_do (self,line):
- '''
+ def do_do( self, line ):
+ """
Do will translate and execute the openspeak statement for the paused test.
do <OpenSpeak statement>
- '''
+ """
if testthread:
from core import openspeak
ospk = openspeak.OpenSpeak()
- try :
- translated_code = ospk.interpret(text=line)
- eval(translated_code)
+ try:
+ translated_code = ospk.interpret( text=line )
+ eval( translated_code )
except ( AttributeError, SyntaxError ) as e:
print 'Dynamic params are not allowed in single statement translations:'
print e
- else :
+ else:
print "Do will translate and execute the openspeak statement for the paused test.\nPlease use interpret to translate the OpenSpeak statement."
- def do_compile(self,line):
- '''
- compile will translate the openspeak (.ospk) file into TestON test script (python).
+ def do_compile( self, line ):
+ """
+ compile will translate the openspeak ( .ospk ) file into TestON test script ( python ).
It will receive the openspeak file path as input and will generate
equivalent test-script file in the same directory.
@@ -407,15 +402,15 @@
teston>compile /home/openflow/TestON/PoxTest.ospk
Auto-generated test-script file is /home/openflow/TestON/PoxTest.py
- '''
+ """
from core import openspeak
openspeak = openspeak.OpenSpeak()
openspeakfile = line
- if os.path.exists(openspeakfile) :
- openspeak.compiler(openspeakfile=openspeakfile,writetofile=1)
- print "Auto-generated test-script file is "+ re.sub("ospk","py",openspeakfile,0)
+ if os.path.exists( openspeakfile ):
+ openspeak.compiler( openspeakfile=openspeakfile, writetofile=1 )
+ print "Auto-generated test-script file is " + re.sub( "ospk", "py", openspeakfile, 0 )
else:
- print 'There is no such file : '+line
+ print 'There is no such file : ' + line
def do_exit( self, _line ):
"Exit"
@@ -440,7 +435,7 @@
return isatty( self.stdin.fileno() )
def do_source( self, line ):
- '''
+ """
Read shell commands from an input file and execute them sequentially.
cmdsource.txt :
@@ -451,10 +446,9 @@
/home/openflow/TestON/bin/
cli.py __init__.py
- '''
-
+ """
args = line.split()
- if len(args) != 1:
+ if len( args ) != 1:
error( 'usage: source <file>\n' )
return
try:
@@ -471,43 +465,45 @@
def do_time( self, line ):
"Measure time taken for any command in TestON."
start = time.time()
- self.onecmd(line)
+ self.onecmd( line )
elapsed = time.time() - start
- self.stdout.write("*** Elapsed time: %0.6f secs\n" % elapsed)
+ self.stdout.write( "*** Elapsed time: %0.6f secs\n" % elapsed )
def default( self, line ):
- """Called on an input line when the command prefix is not recognized."""
+ "Called on an input line when the command prefix is not recognized."
first, args, line = self.parseline( line )
if not args:
return
- if args and len(args) > 0 and args[ -1 ] == '\n':
+ if args and len( args ) > 0 and args[ -1 ] == '\n':
args = args[ :-1 ]
rest = args.split( ' ' )
error( '*** Unknown command: %s\n' % first )
-class TestThread(threading.Thread):
- '''
+
+class TestThread( threading.Thread ):
+
+ """
TestThread class will handle the test execution and will communicate with the thread in the do_run.
- '''
- def __init__(self,options):
+ """
+ def __init__( self, options ):
self._stopevent = threading.Event()
- threading.Thread.__init__(self)
+ threading.Thread.__init__( self )
self.is_stop = False
self.options = options
__builtin__.testthread = self
- def run(self):
- '''
+ def run( self ):
+ """
Will execute the test.
- '''
- while not self.is_stop :
+ """
+ while not self.is_stop:
if not self._stopevent.isSet():
- self.test_on = TestON(self.options)
- try :
+ self.test_on = TestON( self.options )
+ try:
if self.test_on.init_result:
result = self.test_on.run()
- if not self.is_stop :
+ if not self.is_stop:
result = self.test_on.cleanup()
self.is_stop = True
except KeyboardInterrupt:
@@ -517,10 +513,10 @@
__builtin__.testthread = False
- def pause(self):
- '''
+ def pause( self ):
+ """
Will pause the test.
- '''
+ """
if not cli.pause:
print "Will pause the test's execution, after completion of this step.....\n\n\n\n"
cli.pause = True
@@ -533,56 +529,59 @@
result = self.test_on.cleanup()
self.is_stop = True
- def play(self):
- '''
+ def play( self ):
+ """
Will resume the paused test.
- '''
+ """
self._stopevent.clear()
cli.pause = False
- def stop(self):
- '''
+ def stop( self ):
+ """
Will stop the test execution.
- '''
-
+ """
print "Stopping the test"
self.is_stop = True
cli.stop = True
__builtin__.testthread = False
-def output(msg):
- '''
+
+def output( msg ):
+ """
Simply, print the message in console
- '''
+ """
print msg
-def error(msg):
- '''
+
+def error( msg ):
+ """
print the error message.
- '''
+ """
print msg
-def dictToObj(dictionary):
- '''
+
+def dictToObj( dictionary ):
+ """
This will facilitates the converting of the dictionary to the object.
This method will help to send options as object format to the test.
- '''
- if isinstance(dictionary, list):
- dictionary = [dictToObj(x) for x in dictionary]
- if not isinstance(dictionary, dict):
+ """
+ if isinstance( dictionary, list ):
+ dictionary = [ dictToObj( x ) for x in dictionary ]
+ if not isinstance( dictionary, dict ):
return dictionary
- class Convert(object):
+
+ class Convert( object ):
pass
obj = Convert()
for k in dictionary:
- obj.__dict__[k] = dictToObj(dictionary[k])
+ obj.__dict__[ k ] = dictToObj( dictionary[ k ] )
return obj
if __name__ == '__main__':
- if len(sys.argv) > 1:
+ if len( sys.argv ) > 1:
__builtin__.COLORS = True
- CLI("test").onecmd(' '.join(sys.argv[1:]))
+ CLI( "test" ).onecmd( ' '.join( sys.argv[ 1: ] ) )
else:
__builtin__.COLORS = False
- CLI("test").cmdloop()
+ CLI( "test" ).cmdloop()