blob: 039ca4ad5d37b134cfb24d292fe1377696de0ac1 [file] [log] [blame]
#!/usr/bin/env python
'''
Created on 23-Oct-2012
Copyright 2012 Open Networking Foundation (ONF)
Please refer questions to either the onos test mailing list at <onos-test@onosproject.org>,
the System Testing Plans and Results wiki page at <https://wiki.onosproject.org/x/voMg>,
or the System Testing Guide page at <https://wiki.onosproject.org/x/WYQg>
TestON is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TestON is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TestON. If not, see <http://www.gnu.org/licenses/>.
Utilities will take care about the basic functions like :
* Extended assertion,
* parse_args for key-value pair handling
* Parsing the params or topology file.
'''
import re
from configobj import ConfigObj
from core import ast as ast
import smtplib
import email
import os
import email.mime.application
import time
import random
class Utilities:
'''
Utilities will take care about the basic functions like :
* Extended assertion,
* parse_args for key-value pair handling
* Parsing the params or topology file.
'''
def __init__( self ):
self.wrapped = sys.modules[ __name__ ]
def __getattr__( self, name ):
'''
This will invoke, if the attribute wasn't found the usual ways.
Here it will look for assert_attribute and will execute when AttributeError occurs.
It will return the result of the assert_attribute.
'''
try:
return getattr( self.wrapped, name )
except AttributeError:
def assertHandling( **kwargs ):
nameVar = re.match( "^assert", name, flags=0 )
matchVar = re.match( "assert(_not_|_)(equals|matches|greater|lesser)", name, flags=0 )
notVar = 0
operators = ""
try:
if matchVar.group( 1 ) == "_not_" and matchVar.group( 2 ):
notVar = 1
operators = matchVar.group( 2 )
elif matchVar.group( 1 ) == "_" and matchVar.group( 2 ):
operators = matchVar.group( 2 )
except AttributeError:
if matchVar is None and nameVar:
operators = 'equals'
result = self._assert( NOT=notVar, operator=operators, **kwargs )
if result == main.TRUE:
main.log.info( "Assertion Passed" )
main.STEPRESULT = main.TRUE
elif result == main.FALSE:
main.log.warn( "Assertion Failed" )
main.STEPRESULT = main.FALSE
else:
main.log.error( "There is an Error in Assertion" )
main.STEPRESULT = main.ERROR
return result
return assertHandling
def _assert( self, **assertParam ):
'''
It will take the arguments :
expect:'Expected output'
actual:'Actual output'
onpass:'Action or string to be triggered or displayed respectively when the assert passed'
onfail:'Action or string to be triggered or displayed respectively when the assert failed'
not:'optional argument to specify the negation of the each assertion type'
operator:'assertion type will be defined by using operator. Like equal , greater, lesser, matches.'
onfailFunc: Function to run when the assert fails. Will default to a noop function.
onfailFuncArgs: Arguments for onfailFunc
onfailFuncKwargs: Keyword Arguments for onfailFunc
It will return the assertion result.
'''
def noop( *args, **kwargs ):
pass
arguments = self.parse_args( [ "EXPECT", "ACTUAL", "ONPASS", "ONFAIL", "NOT", "OPERATOR" ], **assertParam )
onfailFunc = noop
onfailFunc = assertParam.get( "onfailFunc", noop )
onfailFuncArgs = assertParam.get( "onfailFuncArgs", [] )
onfailFuncKwargs = assertParam.get( "onfailFuncKwargs", {} )
result = 0
valuetype = ''
operation = "not " + str( arguments[ "OPERATOR" ] ) if arguments[ 'NOT' ] and arguments[ 'NOT' ] == 1 else arguments[ "OPERATOR" ]
operators = { 'equals': { 'STR': '==', 'NUM': '==' }, 'matches': '=~', 'greater': '>', 'lesser': '<' }
expectMatch = re.match( '^\s*[+-]?0(e0)?\s*$', str( arguments[ "EXPECT" ] ), re.I + re.M )
if not( ( not expectMatch ) and ( arguments[ "EXPECT" ] == 0 ) ):
valuetype = 'NUM'
else:
if arguments[ "OPERATOR" ] == 'greater' or arguments[ "OPERATOR" ] == 'lesser':
main.log.error( "Numeric comparison on strings is not possibele" )
return main.ERROR
valuetype = 'STR'
arguments[ "ACTUAL" ] = str( arguments[ "ACTUAL" ] )
if arguments[ "OPERATOR" ] != 'matches':
arguments[ "EXPECT" ] = str( arguments[ "EXPECT" ] )
try:
opcode = operators[ str( arguments[ "OPERATOR" ] ) ][ valuetype ] if arguments[ "OPERATOR" ] == 'equals' else operators[ str( arguments[ "OPERATOR" ] ) ]
except KeyError:
main.log.exeception( "Key Error in assertion" )
return main.FALSE
if opcode == '=~':
try:
assert re.search( str( arguments[ "EXPECT" ] ), str( arguments[ "ACTUAL" ] ) )
result = main.TRUE
except AssertionError:
try:
assert re.match( str( arguments[ "EXPECT" ] ), str( arguments[ "ACTUAL" ] ) )
result = main.TRUE
except AssertionError:
main.log.error( "Assertion Failed" )
result = main.FALSE
else:
try:
if str( opcode ) == "==":
main.log.info( "Verifying the Expected is equal to the actual or not using assert_equal" )
if arguments[ "EXPECT" ] == arguments[ "ACTUAL" ]:
result = main.TRUE
else:
result = main.FALSE
elif str( opcode ) == ">":
main.log.info( "Verifying the Expected is Greater than the actual or not using assert_greater" )
if ast.literal_eval( arguments[ "EXPECT" ] ) > ast.literal_eval( arguments[ "ACTUAL" ] ):
result = main.TRUE
else:
result = main.FALSE
elif str( opcode ) == "<":
main.log.info( "Verifying the Expected is Lesser than the actual or not using assert_lesser" )
if ast.literal_eval( arguments[ "EXPECT" ] ) < ast.literal_eval( arguments[ "ACTUAL" ] ):
result = main.TRUE
else:
result = main.FALSE
except AssertionError:
main.log.error( "Assertion Failed" )
result = main.FALSE
result = result if result else 0
result = not result if arguments[ "NOT" ] and arguments[ "NOT" ] == 1 else result
resultString = ""
if result:
resultString = str( resultString ) + "PASS"
main.log.info( arguments[ "ONPASS" ] )
else:
resultString = str( resultString ) + "FAIL"
if not isinstance( arguments[ "ONFAIL" ], str ):
eval( str( arguments[ "ONFAIL" ] ) )
else:
main.log.error( arguments[ "ONFAIL" ] )
main.log.report( arguments[ "ONFAIL" ] )
main.onFailMsg = arguments[ 'ONFAIL' ]
msg = arguments[ "ON" + str( resultString ) ]
if not isinstance( msg, str ):
try:
eval( str( msg ) )
except SyntaxError:
main.log.exception( "function definition is not right" )
main.last_result = result
if main.stepResults[ 2 ]:
main.stepResults[ 2 ][ -1 ] = result
try:
main.stepResults[ 3 ][ -1 ] = arguments[ 'ONFAIL' ]
except AttributeError:
pass
else:
main.log.warn( "Assertion called before a test step" )
try:
if result != main.TRUE:
onfailFunc( *onfailFuncArgs, **onfailFuncKwargs )
except SkipCase:
raise
except Exception:
main.log.exception( "Error calling onfailFunc: %s" % onfailFunc )
return False
else:
return result
def parse_args( self, args, **kwargs ):
'''
It will accept the (key,value) pair and will return the (key,value) pairs with keys in uppercase.
'''
newArgs = {}
for key, value in kwargs.iteritems():
if isinstance( args, list ) and str.upper( key ) in args:
for each in args:
if each == str.upper( key ):
newArgs[ str( each ) ] = value
elif each != str.upper( key ) and str( each ) not in newArgs:
newArgs[ str( each ) ] = None
return newArgs
def send_mail( self ):
# Create a text/plain message
msg = email.mime.Multipart.MIMEMultipart()
try:
if main.test_target:
sub = "Result summary of \"" + main.TEST + "\" run on component \"" + \
main.test_target + "\" Version \"" + \
vars( main )[ main.test_target ].get_version() + "\": " + \
str( main.TOTAL_TC_SUCCESS_PERCENT ) + "% Passed"
else:
sub = "Result summary of \"" + main.TEST + "\": " + \
str( main.TOTAL_TC_SUCCESS_PERCENT ) + "% Passed"
except( KeyError, AttributeError ):
sub = "Result summary of \"" + main.TEST + "\": " + \
str( main.TOTAL_TC_SUCCESS_PERCENT ) + "% Passed"
msg[ 'Subject' ] = sub
msg[ 'From' ] = main.sender
msg[ 'To' ] = main.mail
# The main body is just another attachment
body = email.mime.Text.MIMEText( main.logHeader + "\n" +
main.testResult )
msg.attach( body )
# Attachments
for filename in os.listdir( main.logdir ):
filepath = main.logdir + "/" + filename
fp = open( filepath, 'rb' )
att = email.mime.application.MIMEApplication( fp.read(),
_subtype="" )
fp.close()
att.add_header( 'Content-Disposition',
'attachment',
filename=filename )
msg.attach( att )
try:
smtp = smtplib.SMTP( main.smtp )
smtp.starttls()
smtp.login( main.sender, main.senderPwd )
smtp.sendmail( msg[ 'From' ], [ msg[ 'To' ]], msg.as_string() )
smtp.quit()
except Exception:
main.log.exception( "Error sending email" )
return main.TRUE
def send_warning_email( self, subject=None ):
try:
if not subject:
subject = main.TEST + " PAUSED!"
# Create a text/plain message
msg = email.mime.Multipart.MIMEMultipart()
msg[ 'Subject' ] = subject
msg[ 'From' ] = main.sender
msg[ 'To' ] = main.mail
smtp = smtplib.SMTP( main.smtp )
smtp.starttls()
smtp.login( main.sender, main.senderPwd )
smtp.sendmail( msg[ 'From' ], [ msg[ 'To' ]], msg.as_string() )
smtp.quit()
except Exception:
main.log.exception( "" )
return main.FALSE
return main.TRUE
def parse( self, fileName ):
'''
This will parse the params or topo or cfg file and return content in the file as Dictionary
'''
self.fileName = fileName
matchFileName = re.match( r'(.*)\.(cfg|params|topo)', self.fileName, re.M | re.I )
if matchFileName:
try:
parsedInfo = ConfigObj( self.fileName )
return parsedInfo
except StandardError:
print "There is no such file to parse " + fileName
else:
return 0
def retry( self, f, retValue, args=(), kwargs={},
sleep=1, attempts=2, randomTime=False,
getRetryingTime=False ):
"""
Given a function and bad return values, retry will retry a function
until successful or give up after a certain number of attempts.
Arguments:
f - a callable object
retValue - Return value(s) of f to retry on. This can be a list or an
object.
args - A tuple containing the arguments of f.
kwargs - A dictionary containing the keyword arguments of f.
sleep - Time in seconds to sleep between retries. If random is True,
this is the max time to wait. Defaults to 1 second.
attempts - Max number of attempts before returning. If set to 1,
f will only be called once. Defaults to 2 trys.
random - Boolean indicating if the wait time is random between 0
and sleep or exactly sleep seconds. Defaults to False.
"""
# TODO: be able to pass in a conditional statement(s). For example:
# retCondition = "< 7"
# Then we do something like 'if eval( "ret " + retCondition ):break'
try:
assert attempts > 0, "attempts must be more than 1"
assert sleep >= 0, "sleep must be >= 0"
if not isinstance( retValue, list ):
retValue = [ retValue ]
if getRetryingTime:
startTime = time.time()
for i in range( 0, attempts ):
ret = f( *args, **kwargs )
if ret not in retValue: # NOTE that False in [ 0 ] == True
break
if randomTime:
sleeptime = random.randint( 0, sleep )
else:
sleeptime = sleep
time.sleep( sleeptime )
if i > 0:
main.log.debug( str( f ) + " was retried " + str( i ) + " times." )
if getRetryingTime:
main.log.debug( "Took " + str( time.time() - startTime ) + " seconds for retrying." )
return ret
except AssertionError:
main.log.exception( "Invalid arguements for retry: " )
main.cleanAndExit()
except Exception:
main.log.exception( "Uncaught exception in retry: " )
main.cleanAndExit()
if __name__ != "__main__":
import sys
sys.modules[ __name__ ] = Utilities()