Reverting core/ and drivers/ directories before change 15514
This reverts commit 23fb21617769f4320de93b5b1805c6ec3ca9b809.
Change-Id: I0c116f8d7195c75c7ef17f296843924d3e2a0961
diff --git a/TestON/core/utilities.py b/TestON/core/utilities.py
index fee2899..2f7e5bb 100644
--- a/TestON/core/utilities.py
+++ b/TestON/core/utilities.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
-"""
+'''
Created on 23-Oct-2012
-Copyright 2012 Open Networking Foundation ( ONF )
+Copyright 2012 Open Networking Foundation (ONF)
Please refer questions to either the onos test mailing list at <onos-test@onosproject.org>,
the System Testing Plans and Results wiki page at <https://wiki.onosproject.org/x/voMg>,
@@ -10,7 +10,7 @@
TestON is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
- ( at your option ) any later version.
+ (at your option) any later version.
TestON is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -26,7 +26,7 @@
* parse_args for key-value pair handling
* Parsing the params or topology file.
-"""
+'''
import re
from configobj import ConfigObj
from core import ast as ast
@@ -38,57 +38,56 @@
import time
import random
-
class Utilities:
-
- """
+ '''
Utilities will take care about the basic functions like :
* Extended assertion,
* parse_args for key-value pair handling
* Parsing the params or topology file.
- """
- def __init__( self ):
- self.wrapped = sys.modules[ __name__ ]
+ '''
- def __getattr__( self, name ):
- """
+ def __init__(self):
+ self.wrapped = sys.modules[__name__]
+
+ def __getattr__(self, name):
+ '''
This will invoke, if the attribute wasn't found the usual ways.
Here it will look for assert_attribute and will execute when AttributeError occurs.
It will return the result of the assert_attribute.
- """
+ '''
try:
- return getattr( self.wrapped, name )
+ return getattr(self.wrapped, name)
except AttributeError:
- def assertHandling( **kwargs ):
- nameVar = re.match( "^assert", name, flags=0 )
- matchVar = re.match( "assert(_not_|_)(equals|matches|greater|lesser)", name, flags=0 )
+ def assertHandling(**kwargs):
+ nameVar = re.match("^assert",name,flags=0)
+ matchVar = re.match("assert(_not_|_)(equals|matches|greater|lesser)",name,flags=0)
notVar = 0
operators = ""
- try:
- if matchVar.group( 1 ) == "_not_" and matchVar.group( 2 ):
+ try :
+ if matchVar.group(1) == "_not_" and matchVar.group(2) :
notVar = 1
- operators = matchVar.group( 2 )
- elif matchVar.group( 1 ) == "_" and matchVar.group( 2 ):
- operators = matchVar.group( 2 )
+ operators = matchVar.group(2)
+ elif matchVar.group(1) == "_" and matchVar.group(2):
+ operators = matchVar.group(2)
except AttributeError:
- if matchVar is None and nameVar:
- operators = 'equals'
- result = self._assert( NOT=notVar, operator=operators, **kwargs )
+ if matchVar==None and nameVar:
+ operators ='equals'
+ result = self._assert(NOT=notVar,operator=operators,**kwargs)
if result == main.TRUE:
- main.log.info( "Assertion Passed" )
+ main.log.info("Assertion Passed")
main.STEPRESULT = main.TRUE
elif result == main.FALSE:
- main.log.warn( "Assertion Failed" )
+ main.log.warn("Assertion Failed")
main.STEPRESULT = main.FALSE
else:
- main.log.error( "There is an Error in Assertion" )
+ main.log.error("There is an Error in Assertion")
main.STEPRESULT = main.ERROR
return result
return assertHandling
- def _assert( self, **assertParam ):
- """
+ def _assert (self,**assertParam):
+ '''
It will take the arguments :
expect:'Expected output'
actual:'Actual output'
@@ -99,29 +98,30 @@
It will return the assertion result.
- """
- arguments = self.parse_args( [ "EXPECT", "ACTUAL", "ONPASS", "ONFAIL", "NOT", "OPERATOR" ], **assertParam )
+ '''
+
+ arguments = self.parse_args(["EXPECT","ACTUAL","ONPASS","ONFAIL","NOT","OPERATOR"],**assertParam)
result = 0
valuetype = ''
- operation = "not " + str( arguments[ "OPERATOR" ] ) if arguments[ 'NOT' ] and arguments[ 'NOT' ] == 1 else arguments[ "OPERATOR" ]
- operators = { 'equals': { 'STR': '==', 'NUM': '==' }, 'matches': '=~', 'greater': '>', 'lesser': '<' }
+ operation = "not "+ str(arguments["OPERATOR"]) if arguments['NOT'] and arguments['NOT'] == 1 else arguments["OPERATOR"]
+ operators = {'equals':{'STR':'==','NUM':'=='}, 'matches' : '=~', 'greater':'>' ,'lesser':'<'}
- expectMatch = re.match( '^\s*[+-]?0(e0)?\s*$', str( arguments[ "EXPECT" ] ), re.I + re.M )
- if not ( ( not expectMatch ) and ( arguments[ "EXPECT" ] == 0 ) ):
+ expectMatch = re.match('^\s*[+-]?0(e0)?\s*$', str(arguments["EXPECT"]), re.I+re.M)
+ if not ((not expectMatch) and (arguments["EXPECT"]==0)):
valuetype = 'NUM'
- else:
- if arguments[ "OPERATOR" ] == 'greater' or arguments[ "OPERATOR" ] == 'lesser':
- main.log.error( "Numeric comparison on strings is not possibele" )
+ else :
+ if arguments["OPERATOR"] == 'greater' or arguments["OPERATOR"] == 'lesser':
+ main.log.error("Numeric comparison on strings is not possibele")
return main.ERROR
valuetype = 'STR'
- arguments[ "ACTUAL" ] = str( arguments[ "ACTUAL" ] )
- if arguments[ "OPERATOR" ] != 'matches':
- arguments[ "EXPECT" ] = str( arguments[ "EXPECT" ] )
+ arguments["ACTUAL"] = str(arguments["ACTUAL"])
+ if arguments["OPERATOR"] != 'matches':
+ arguments["EXPECT"] = str(arguments["EXPECT"])
- try:
- opcode = operators[ str( arguments[ "OPERATOR" ] ) ][ valuetype ] if arguments[ "OPERATOR" ] == 'equals' else operators[ str( arguments[ "OPERATOR" ] ) ]
+ try :
+ opcode = operators[str(arguments["OPERATOR"])][valuetype] if arguments["OPERATOR"] == 'equals' else operators[str(arguments["OPERATOR"])]
except KeyError as e:
print "Key Error in assertion"
@@ -130,111 +130,111 @@
if opcode == '=~':
try:
- assert re.search( str( arguments[ "EXPECT" ] ), str( arguments[ "ACTUAL" ] ) )
+ assert re.search(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
result = main.TRUE
except AssertionError:
- try:
- assert re.match( str( arguments[ "EXPECT" ] ), str( arguments[ "ACTUAL" ] ) )
+ try :
+ assert re.match(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
result = main.TRUE
except AssertionError:
- main.log.error( "Assertion Failed" )
+ main.log.error("Assertion Failed")
result = main.FALSE
- else:
+ else :
try:
- if str( opcode ) == "==":
- main.log.info( "Verifying the Expected is equal to the actual or not using assert_equal" )
- if ( arguments[ "EXPECT" ] == arguments[ "ACTUAL" ] ):
+ if str(opcode)=="==":
+ main.log.info("Verifying the Expected is equal to the actual or not using assert_equal")
+ if (arguments["EXPECT"] == arguments["ACTUAL"]):
result = main.TRUE
- else:
+ else :
result = main.FALSE
- elif str( opcode ) == ">":
- main.log.info( "Verifying the Expected is Greater than the actual or not using assert_greater" )
- if ( ast.literal_eval( arguments[ "EXPECT" ] ) > ast.literal_eval( arguments[ "ACTUAL" ] ) ):
+ elif str(opcode) == ">":
+ main.log.info("Verifying the Expected is Greater than the actual or not using assert_greater")
+ if (ast.literal_eval(arguments["EXPECT"]) > ast.literal_eval(arguments["ACTUAL"])) :
result = main.TRUE
- else:
+ else :
result = main.FALSE
- elif str( opcode ) == "<":
- main.log.info( "Verifying the Expected is Lesser than the actual or not using assert_lesser" )
- if ( ast.literal_eval( arguments[ "EXPECT" ] ) < ast.literal_eval( arguments[ "ACTUAL" ] ) ):
+ elif str(opcode) == "<":
+ main.log.info("Verifying the Expected is Lesser than the actual or not using assert_lesser")
+ if (ast.literal_eval(arguments["EXPECT"]) < ast.literal_eval(arguments["ACTUAL"])):
result = main.TRUE
- else:
+ else :
result = main.FALSE
except AssertionError:
- main.log.error( "Assertion Failed" )
+ main.log.error("Assertion Failed")
result = main.FALSE
result = result if result else 0
- result = not result if arguments[ "NOT" ] and arguments[ "NOT" ] == 1 else result
+ result = not result if arguments["NOT"] and arguments["NOT"] == 1 else result
resultString = ""
- if result:
- resultString = str( resultString ) + "PASS"
- main.log.info( arguments[ "ONPASS" ] )
- else:
- resultString = str( resultString ) + "FAIL"
- if not isinstance( arguments[ "ONFAIL" ], str ):
- eval( str( arguments[ "ONFAIL" ] ) )
- else:
- main.log.error( arguments[ "ONFAIL" ] )
- main.log.report( arguments[ "ONFAIL" ] )
+ if result :
+ resultString = str(resultString) + "PASS"
+ main.log.info(arguments["ONPASS"])
+ else :
+ resultString = str(resultString) + "FAIL"
+ if not isinstance(arguments["ONFAIL"],str):
+ eval(str(arguments["ONFAIL"]))
+ else :
+ main.log.error(arguments["ONFAIL"])
+ main.log.report(arguments["ONFAIL"])
main.onFailMsg = arguments[ 'ONFAIL' ]
- msg = arguments[ "ON" + str( resultString ) ]
+ msg = arguments["ON" + str(resultString)]
- if not isinstance( msg, str ):
+ if not isinstance(msg,str):
try:
- eval( str( msg ) )
+ eval(str(msg))
except SyntaxError as e:
print "function definition is not right"
print e
main.last_result = result
- if main.stepResults[ 2 ]:
- main.stepResults[ 2 ][ -1 ] = result
+ if main.stepResults[2]:
+ main.stepResults[2][-1] = result
try:
- main.stepResults[ 3 ][ -1 ] = arguments[ 'ONFAIL' ]
+ main.stepResults[3][-1] = arguments[ 'ONFAIL' ]
except AttributeError:
pass
else:
main.log.warn( "Assertion called before a test step" )
return result
- def parse_args( self, args, **kwargs ):
- """
- It will accept the ( key,value ) pair and will return the ( key,value ) pairs with keys in uppercase.
- """
+ def parse_args(self,args, **kwargs):
+ '''
+ It will accept the (key,value) pair and will return the (key,value) pairs with keys in uppercase.
+ '''
newArgs = {}
- for key, value in kwargs.iteritems():
- if isinstance( args, list ) and str.upper( key ) in args:
+ for key,value in kwargs.iteritems():
+ if isinstance(args,list) and str.upper(key) in args:
for each in args:
- if each == str.upper( key ):
- newArgs[ str( each ) ] = value
- elif each != str.upper( key ) and not ( ( str( each ) in newArgs ) ):
- newArgs[ str( each ) ] = None
+ if each==str.upper(key):
+ newArgs [str(each)] = value
+ elif each != str.upper(key) and (newArgs.has_key(str(each)) == False ):
+ newArgs[str(each)] = None
return newArgs
- def send_mail( self ):
+ def send_mail(self):
# Create a text/plain message
msg = email.mime.Multipart.MIMEMultipart()
- try:
+ try :
if main.test_target:
sub = "Result summary of \"" + main.TEST + "\" run on component \"" +\
main.test_target + "\" Version \"" +\
- vars( main )[ main.test_target ].get_version() + "\": " +\
+ vars( main )[main.test_target].get_version() + "\": " +\
str( main.TOTAL_TC_SUCCESS ) + "% Passed"
- else:
+ else :
sub = "Result summary of \"" + main.TEST + "\": " +\
str( main.TOTAL_TC_SUCCESS ) + "% Passed"
except ( KeyError, AttributeError ):
sub = "Result summary of \"" + main.TEST + "\": " +\
str( main.TOTAL_TC_SUCCESS ) + "% Passed"
- msg[ 'Subject' ] = sub
- msg[ 'From' ] = main.sender
- msg[ 'To' ] = main.mail
+ msg['Subject'] = sub
+ msg['From'] = main.sender
+ msg['To'] = main.mail
# The main body is just another attachment
body = email.mime.Text.MIMEText( main.logHeader + "\n" +
- main.testResult )
+ main.testResult)
msg.attach( body )
# Attachments
@@ -252,7 +252,7 @@
smtp = smtplib.SMTP( main.smtp )
smtp.starttls()
smtp.login( main.sender, main.senderPwd )
- smtp.sendmail( msg[ 'From' ], [ msg[ 'To' ] ], msg.as_string() )
+ smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
smtp.quit()
except Exception:
main.log.exception( "Error sending email" )
@@ -265,32 +265,32 @@
# Create a text/plain message
msg = email.mime.Multipart.MIMEMultipart()
- msg[ 'Subject' ] = subject
- msg[ 'From' ] = main.sender
- msg[ 'To' ] = main.mail
+ msg['Subject'] = subject
+ msg['From'] = main.sender
+ msg['To'] = main.mail
smtp = smtplib.SMTP( main.smtp )
smtp.starttls()
smtp.login( main.sender, main.senderPwd )
- smtp.sendmail( msg[ 'From' ], [ msg[ 'To' ] ], msg.as_string() )
+ smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
smtp.quit()
except Exception:
main.log.exception( "" )
return main.FALSE
return main.TRUE
- def parse( self, fileName ):
- """
+ def parse(self,fileName):
+ '''
This will parse the params or topo or cfg file and return content in the file as Dictionary
- """
+ '''
self.fileName = fileName
- matchFileName = re.match( r'(.*)\.(cfg|params|topo)', self.fileName, re.M | re.I )
+ matchFileName = re.match(r'(.*)\.(cfg|params|topo)',self.fileName,re.M|re.I)
if matchFileName:
- try:
- parsedInfo = ConfigObj( self.fileName )
+ try :
+ parsedInfo = ConfigObj(self.fileName)
return parsedInfo
- except Exception:
- print "There is no such file to parse " + fileName
+ except StandardError:
+ print "There is no such file to parse "+fileName
else:
return 0
@@ -303,7 +303,7 @@
Arguments:
f - a callable object
- retValue - Return value( s ) of f to retry on. This can be a list or an
+ retValue - Return value(s) of f to retry on. This can be a list or an
object.
args - A tuple containing the arguments of f.
kwargs - A dictionary containing the keyword arguments of f.
@@ -314,7 +314,7 @@
random - Boolean indicating if the wait time is random between 0
and sleep or exactly sleep seconds. Defaults to False.
"""
- # TODO: be able to pass in a conditional statement( s ). For example:
+ # TODO: be able to pass in a conditional statement(s). For example:
# retCondition = "< 7"
# Then we do something like 'if eval( "ret " + retCondition ):break'
try:
@@ -327,7 +327,7 @@
for i in range( 0, attempts ):
ret = f( *args, **kwargs )
if ret not in retValue:
- # NOTE that False in [ 0 ] == True
+ # NOTE that False in [ 0 ] == True
break
if randomTime:
sleeptime = random.randint( 0, sleep )
@@ -350,4 +350,4 @@
if __name__ != "__main__":
import sys
- sys.modules[ __name__ ] = Utilities()
+ sys.modules[__name__] = Utilities()