| #!/usr/bin/env python |
| ''' |
| Created on 23-Oct-2012 |
| Copyright 2012 Open Networking Foundation (ONF) |
| |
| Please refer questions to either the onos test mailing list at <onos-test@onosproject.org>, |
| the System Testing Plans and Results wiki page at <https://wiki.onosproject.org/x/voMg>, |
| or the System Testing Guide page at <https://wiki.onosproject.org/x/WYQg> |
| |
| TestON is free software: you can redistribute it and/or modify |
| it under the terms of the GNU General Public License as published by |
| the Free Software Foundation, either version 2 of the License, or |
| (at your option) any later version. |
| |
| TestON is distributed in the hope that it will be useful, |
| but WITHOUT ANY WARRANTY; without even the implied warranty of |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| GNU General Public License for more details. |
| |
| You should have received a copy of the GNU General Public License |
| along with TestON. If not, see <http://www.gnu.org/licenses/>. |
| |
| |
| Utilities will take care about the basic functions like : |
| * Extended assertion, |
| * parse_args for key-value pair handling |
| * Parsing the params or topology file. |
| |
| ''' |
| import re |
| from configobj import ConfigObj |
| from core import ast as ast |
| import smtplib |
| |
| import email |
| import os |
| import email.mime.application |
| import time |
| import random |
| |
| |
| class Utilities: |
| ''' |
| Utilities will take care about the basic functions like : |
| * Extended assertion, |
| * parse_args for key-value pair handling |
| * Parsing the params or topology file. |
| ''' |
| |
| def __init__( self ): |
| self.wrapped = sys.modules[ __name__ ] |
| |
| def __getattr__( self, name ): |
| ''' |
| This will invoke, if the attribute wasn't found the usual ways. |
| Here it will look for assert_attribute and will execute when AttributeError occurs. |
| It will return the result of the assert_attribute. |
| ''' |
| try: |
| return getattr( self.wrapped, name ) |
| except AttributeError: |
| def assertHandling( **kwargs ): |
| nameVar = re.match( "^assert", name, flags=0 ) |
| matchVar = re.match( "assert(_not_|_)(equals|matches|greater|lesser)", name, flags=0 ) |
| notVar = 0 |
| operators = "" |
| |
| try: |
| if matchVar.group( 1 ) == "_not_" and matchVar.group( 2 ): |
| notVar = 1 |
| operators = matchVar.group( 2 ) |
| elif matchVar.group( 1 ) == "_" and matchVar.group( 2 ): |
| operators = matchVar.group( 2 ) |
| except AttributeError: |
| if matchVar is None and nameVar: |
| operators = 'equals' |
| result = self._assert( NOT=notVar, operator=operators, **kwargs ) |
| if result == main.TRUE: |
| main.log.info( "Assertion Passed" ) |
| main.STEPRESULT = main.TRUE |
| elif result == main.FALSE: |
| main.log.warn( "Assertion Failed" ) |
| main.STEPRESULT = main.FALSE |
| else: |
| main.log.error( "There is an Error in Assertion" ) |
| main.STEPRESULT = main.ERROR |
| return result |
| return assertHandling |
| |
| def _assert( self, **assertParam ): |
| ''' |
| It will take the arguments : |
| expect:'Expected output' |
| actual:'Actual output' |
| onpass:'Action or string to be triggered or displayed respectively when the assert passed' |
| onfail:'Action or string to be triggered or displayed respectively when the assert failed' |
| not:'optional argument to specify the negation of the each assertion type' |
| operator:'assertion type will be defined by using operator. Like equal , greater, lesser, matches.' |
| onfailFunc: Function to run when the assert fails. Will default to a noop function. |
| onfailFuncArgs: Arguments for onfailFunc |
| onfailFuncKwargs: Keyword Arguments for onfailFunc |
| |
| It will return the assertion result. |
| |
| ''' |
| def noop( *args, **kwargs ): |
| pass |
| |
| arguments = self.parse_args( [ "EXPECT", "ACTUAL", "ONPASS", "ONFAIL", "NOT", "OPERATOR" ], **assertParam ) |
| onfailFunc = noop |
| onfailFunc = assertParam.get( "onfailFunc", noop ) |
| onfailFuncArgs = assertParam.get( "onfailFuncArgs", [] ) |
| onfailFuncKwargs = assertParam.get( "onfailFuncKwargs", {} ) |
| |
| result = 0 |
| valuetype = '' |
| operation = "not " + str( arguments[ "OPERATOR" ] ) if arguments[ 'NOT' ] and arguments[ 'NOT' ] == 1 else arguments[ "OPERATOR" ] |
| operators = { 'equals': { 'STR': '==', 'NUM': '==' }, 'matches': '=~', 'greater': '>', 'lesser': '<' } |
| |
| expectMatch = re.match( '^\s*[+-]?0(e0)?\s*$', str( arguments[ "EXPECT" ] ), re.I + re.M ) |
| if not( ( not expectMatch ) and ( arguments[ "EXPECT" ] == 0 ) ): |
| valuetype = 'NUM' |
| else: |
| if arguments[ "OPERATOR" ] == 'greater' or arguments[ "OPERATOR" ] == 'lesser': |
| main.log.error( "Numeric comparison on strings is not possibele" ) |
| return main.ERROR |
| |
| valuetype = 'STR' |
| arguments[ "ACTUAL" ] = str( arguments[ "ACTUAL" ] ) |
| if arguments[ "OPERATOR" ] != 'matches': |
| arguments[ "EXPECT" ] = str( arguments[ "EXPECT" ] ) |
| |
| try: |
| opcode = operators[ str( arguments[ "OPERATOR" ] ) ][ valuetype ] if arguments[ "OPERATOR" ] == 'equals' else operators[ str( arguments[ "OPERATOR" ] ) ] |
| |
| except KeyError: |
| main.log.exeception( "Key Error in assertion" ) |
| return main.FALSE |
| |
| if opcode == '=~': |
| try: |
| assert re.search( str( arguments[ "EXPECT" ] ), str( arguments[ "ACTUAL" ] ) ) |
| result = main.TRUE |
| except AssertionError: |
| try: |
| assert re.match( str( arguments[ "EXPECT" ] ), str( arguments[ "ACTUAL" ] ) ) |
| result = main.TRUE |
| except AssertionError: |
| main.log.error( "Assertion Failed" ) |
| result = main.FALSE |
| else: |
| try: |
| if str( opcode ) == "==": |
| main.log.info( "Verifying the Expected is equal to the actual or not using assert_equal" ) |
| if arguments[ "EXPECT" ] == arguments[ "ACTUAL" ]: |
| result = main.TRUE |
| else: |
| result = main.FALSE |
| elif str( opcode ) == ">": |
| main.log.info( "Verifying the Expected is Greater than the actual or not using assert_greater" ) |
| if ast.literal_eval( arguments[ "EXPECT" ] ) > ast.literal_eval( arguments[ "ACTUAL" ] ): |
| result = main.TRUE |
| else: |
| result = main.FALSE |
| elif str( opcode ) == "<": |
| main.log.info( "Verifying the Expected is Lesser than the actual or not using assert_lesser" ) |
| if ast.literal_eval( arguments[ "EXPECT" ] ) < ast.literal_eval( arguments[ "ACTUAL" ] ): |
| result = main.TRUE |
| else: |
| result = main.FALSE |
| except AssertionError: |
| main.log.error( "Assertion Failed" ) |
| result = main.FALSE |
| result = result if result else 0 |
| result = not result if arguments[ "NOT" ] and arguments[ "NOT" ] == 1 else result |
| resultString = "" |
| if result: |
| resultString = str( resultString ) + "PASS" |
| main.log.info( arguments[ "ONPASS" ] ) |
| else: |
| resultString = str( resultString ) + "FAIL" |
| if not isinstance( arguments[ "ONFAIL" ], str ): |
| eval( str( arguments[ "ONFAIL" ] ) ) |
| else: |
| main.log.error( arguments[ "ONFAIL" ] ) |
| main.log.report( arguments[ "ONFAIL" ] ) |
| main.onFailMsg = arguments[ 'ONFAIL' ] |
| |
| msg = arguments[ "ON" + str( resultString ) ] |
| |
| if not isinstance( msg, str ): |
| try: |
| eval( str( msg ) ) |
| except SyntaxError: |
| main.log.exception( "function definition is not right" ) |
| |
| main.last_result = result |
| if main.stepResults[ 2 ]: |
| main.stepResults[ 2 ][ -1 ] = result |
| try: |
| main.stepResults[ 3 ][ -1 ] = arguments[ 'ONFAIL' ] |
| except AttributeError: |
| pass |
| else: |
| main.log.warn( "Assertion called before a test step" ) |
| try: |
| if result != main.TRUE: |
| onfailFunc( *onfailFuncArgs, **onfailFuncKwargs ) |
| except SkipCase: |
| raise |
| except Exception: |
| main.log.exception( "Error calling onfailFunc: %s" % onfailFunc ) |
| return False |
| else: |
| return result |
| |
| def parse_args( self, args, **kwargs ): |
| ''' |
| It will accept the (key,value) pair and will return the (key,value) pairs with keys in uppercase. |
| ''' |
| newArgs = {} |
| for key, value in kwargs.iteritems(): |
| if isinstance( args, list ) and str.upper( key ) in args: |
| for each in args: |
| if each == str.upper( key ): |
| newArgs[ str( each ) ] = value |
| elif each != str.upper( key ) and str( each ) not in newArgs: |
| newArgs[ str( each ) ] = None |
| |
| return newArgs |
| |
| def send_mail( self ): |
| # Create a text/plain message |
| msg = email.mime.Multipart.MIMEMultipart() |
| try: |
| if main.test_target: |
| sub = "Result summary of \"" + main.TEST + "\" run on component \"" + \ |
| main.test_target + "\" Version \"" + \ |
| vars( main )[ main.test_target ].get_version() + "\": " + \ |
| str( main.TOTAL_TC_SUCCESS_PERCENT ) + "% Passed" |
| else: |
| sub = "Result summary of \"" + main.TEST + "\": " + \ |
| str( main.TOTAL_TC_SUCCESS_PERCENT ) + "% Passed" |
| except( KeyError, AttributeError ): |
| sub = "Result summary of \"" + main.TEST + "\": " + \ |
| str( main.TOTAL_TC_SUCCESS_PERCENT ) + "% Passed" |
| |
| msg[ 'Subject' ] = sub |
| msg[ 'From' ] = main.sender |
| msg[ 'To' ] = main.mail |
| |
| # The main body is just another attachment |
| body = email.mime.Text.MIMEText( main.logHeader + "\n" + |
| main.testResult ) |
| msg.attach( body ) |
| |
| # Attachments |
| for filename in os.listdir( main.logdir ): |
| filepath = main.logdir + "/" + filename |
| fp = open( filepath, 'rb' ) |
| att = email.mime.application.MIMEApplication( fp.read(), |
| _subtype="" ) |
| fp.close() |
| att.add_header( 'Content-Disposition', |
| 'attachment', |
| filename=filename ) |
| msg.attach( att ) |
| try: |
| smtp = smtplib.SMTP( main.smtp ) |
| smtp.starttls() |
| smtp.login( main.sender, main.senderPwd ) |
| smtp.sendmail( msg[ 'From' ], [ msg[ 'To' ]], msg.as_string() ) |
| smtp.quit() |
| except Exception: |
| main.log.exception( "Error sending email" ) |
| return main.TRUE |
| |
| def send_warning_email( self, subject=None ): |
| try: |
| if not subject: |
| subject = main.TEST + " PAUSED!" |
| # Create a text/plain message |
| msg = email.mime.Multipart.MIMEMultipart() |
| |
| msg[ 'Subject' ] = subject |
| msg[ 'From' ] = main.sender |
| msg[ 'To' ] = main.mail |
| |
| smtp = smtplib.SMTP( main.smtp ) |
| smtp.starttls() |
| smtp.login( main.sender, main.senderPwd ) |
| smtp.sendmail( msg[ 'From' ], [ msg[ 'To' ]], msg.as_string() ) |
| smtp.quit() |
| except Exception: |
| main.log.exception( "" ) |
| return main.FALSE |
| return main.TRUE |
| |
| def parse( self, fileName ): |
| ''' |
| This will parse the params or topo or cfg file and return content in the file as Dictionary |
| ''' |
| self.fileName = fileName |
| matchFileName = re.match( r'(.*)\.(cfg|params|topo)', self.fileName, re.M | re.I ) |
| if matchFileName: |
| try: |
| parsedInfo = ConfigObj( self.fileName ) |
| return parsedInfo |
| except StandardError: |
| print "There is no such file to parse " + fileName |
| else: |
| return 0 |
| |
| def retry( self, f, retValue, args=(), kwargs={}, |
| sleep=1, attempts=2, randomTime=False, |
| getRetryingTime=False ): |
| """ |
| Given a function and bad return values, retry will retry a function |
| until successful or give up after a certain number of attempts. |
| |
| Arguments: |
| f - a callable object |
| retValue - Return value(s) of f to retry on. This can be a list or an |
| object. |
| args - A tuple containing the arguments of f. |
| kwargs - A dictionary containing the keyword arguments of f. |
| sleep - Time in seconds to sleep between retries. If random is True, |
| this is the max time to wait. Defaults to 1 second. |
| attempts - Max number of attempts before returning. If set to 1, |
| f will only be called once. Defaults to 2 trys. |
| random - Boolean indicating if the wait time is random between 0 |
| and sleep or exactly sleep seconds. Defaults to False. |
| """ |
| # TODO: be able to pass in a conditional statement(s). For example: |
| # retCondition = "< 7" |
| # Then we do something like 'if eval( "ret " + retCondition ):break' |
| try: |
| assert attempts > 0, "attempts must be more than 1" |
| assert sleep >= 0, "sleep must be >= 0" |
| if not isinstance( retValue, list ): |
| retValue = [ retValue ] |
| if getRetryingTime: |
| startTime = time.time() |
| for i in range( 0, attempts ): |
| ret = f( *args, **kwargs ) |
| if ret not in retValue: # NOTE that False in [ 0 ] == True |
| break |
| if randomTime: |
| sleeptime = random.randint( 0, sleep ) |
| else: |
| sleeptime = sleep |
| time.sleep( sleeptime ) |
| if i > 0: |
| main.log.debug( str( f ) + " was retried " + str( i ) + " times." ) |
| if getRetryingTime: |
| main.log.debug( "Took " + str( time.time() - startTime ) + " seconds for retrying." ) |
| return ret |
| except AssertionError: |
| main.log.exception( "Invalid arguements for retry: " ) |
| main.cleanAndExit() |
| except Exception: |
| main.log.exception( "Uncaught exception in retry: " ) |
| main.cleanAndExit() |
| |
| |
| if __name__ != "__main__": |
| import sys |
| |
| sys.modules[ __name__ ] = Utilities() |