#!/usr/bin/env python2
'''
Created on 20-Dec-2012
Copyright 2012 Open Networking Foundation

Please refer questions to either the onos test mailing list at <onos-test@onosproject.org>,
the System Testing Plans and Results wiki page at <https://wiki.onosproject.org/x/voMg>,
or the System Testing Guide page at <https://wiki.onosproject.org/x/WYQg>

    TestON is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 2 of the License, or
    (at your option) any later version.

    TestON is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with TestON.  If not, see <http://www.gnu.org/licenses/>.


'''


"""
cli will provide the CLI shell for teston framework.

A simple command-line interface for TestON.

The TestON CLI provides a simple console which
makes it easy to launch the test. For example, the command run will execute the test.

teston> run test DpctlTest
Several useful commands are provided.
"""

from subprocess import call
from cmd import Cmd
from os import isatty
import sys
import re
import os
import time
import threading
import __builtin__
import pprint
dump = pprint.PrettyPrinter( indent=4 )
__builtin__.testthread = False
introduction = "TestON is the testing framework \nDeveloped by Paxterra Solutions (www.paxterrasolutions.com)"
__builtin__.COLORS = False

path = re.sub( "/bin$", "", sys.path[ 0 ] )
sys.path.insert( 1, path )
from core.teston import *

class CLI( threading.Thread, Cmd, object ):
    "command-line interface to execute the test."

    prompt = 'teston> '

    def __init__( self, teston, stdin=sys.stdin ):
        self.teston = teston

        self._mainevent = threading.Event()
        threading.Thread.__init__( self )
        self.main_stop = False
        self.locals = { 'test': teston }
        self.stdin = stdin
        Cmd.__init__( self )
        self.pause = False
        self.stop = False
        __builtin__.cli = self

    def emptyline( self ):
        "Don't repeat last command when you hit return."
        pass

    helpStr = (
              " teston help"
    )

    def do_help( self, line ):
        "Describe available CLI commands."
        Cmd.do_help( self, line )
        if line is '':
            output( self.helpStr )

    def do_run( self, args ):
        '''
        run command will execute the test with following optional command line arguments
        logdir <directory to store logs in>
        testcases <list of testcases separated by comma or range of testcases separated by hypen>
        mail <mail-id or list of mail-ids seperated by comma>
        example 1, to execute the examples specified in the ~/examples diretory.
        '''
        try:
            args = args.split()
            options = {}
            options = self.parseArgs( args, options )
            options = dictToObj( options )
            if not testthread:
                test = TestThread( options )
                test.start()
                while test.isAlive():
                    test.join( 1 )
            else:
                print main.TEST + " test execution paused, please resume that before executing to another test"
        except KeyboardInterrupt, SystemExit:
            print "Interrupt called, Exiting."
            test._Thread__stop()
            main.cleanup()
            main.exit()

    def do_resume( self, line ):
        '''
        resume command will continue the execution of paused test.
        teston>resume
        [2013-01-07 23:03:44.640723] [PoxTest] [STEP]  1.1: Checking the host reachability using pingHost
        2013-01-07 23:03:44,858 - PoxTest - INFO - Expected Prompt Found
        ....
        '''
        if testthread:
            testthread.play()
        else:
            print "There is no test to resume"

    def do_nextstep( self, line ):
        '''
        nextstep will execute the next-step of the paused test and
        it will pause the test after finishing of step.

        teston> nextstep
        Will pause the test's execution, after completion of this step.....

        teston> [2013-01-07 21:24:26.286601] [PoxTest] [STEP]  1.8: Checking the host reachability using pingHost
        2013-01-07 21:24:26,455 - PoxTest - INFO - Expected Prompt Found
        .....
        teston>

        '''
        if testthread:
            main.log.info( "Executing the nextstep, Will pause test execution, after completion of the step" )
            testthread.play()
            time.sleep( .1 )
            testthread.pause()
        else:
            print "There is no paused test "

    def do_dumpvar( self, line ):
        '''
        dumpvar will print all the test data in raw format.
        usgae :
        teston>dumpvar main
        Here 'main' will be the test object.

        teston>dumpvar params
        here 'params' will be the parameters specified in the params file.

        teston>dumpvar topology
        here 'topology' will be topology specification of the test specified in topo file.
        '''
        if testthread:
            if line == "main":
                dump.pprint( vars( main ) )
            else:
                try:
                    dump.pprint( vars( main )[ line ] )
                except KeyError as e:
                    print e
        else:
            print "There is no paused test "

    def do_currentcase( self, line ):
        '''
        currentcase will return the current case in the test execution.

        teston>currentcase
        Currently executing test case is: 2

        '''
        if testthread:
            print "Currently executing test case is: " + str( main.CurrentTestCaseNumber )
        else:
            print "There is no paused test "

    def do_currentstep( self, line ):
        '''
        currentstep will return the current step in the test execution.

        teston>currentstep
        Currently executing test step is: 2.3
        '''
        if testthread:
            print "Currently executing test step is: " + str( main.CurrentTestCaseNumber ) + '.' + str( main.stepCount )
        else:
            print "There is no paused test "

    def do_stop( self, line ):
        '''
        Will stop the paused test, if any !
        '''
        if testthread:
            testthread.stop()

        return 'exited by user command'

    def do_gettest( self, line ):
        '''
        gettest will return the test name which is under execution or recently executed.

        Test under execution:
        teston>gettest
        Currently executing Test is: PoxTest

        Test recently executed:
        Recently executed test is: MininetTest
        '''
        try:
            if testthread:
                print "Currently executing Test is: " + main.TEST
            else:
                print "Recently executed test is: " + main.TEST

        except NameError:
            print "There is no previously executed Test"

    def do_showlog( self, line ):
        '''
        showlog will show the test's Log
        teston>showlog
        Last executed test's log is : //home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_42_11/PoxTest_07_Jan_2013_21_42_11.log
        .....
        teston>showlog
        Currently executing Test's log is: /home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_46_58/PoxTest_07_Jan_2013_21_46_58.log
        .....
        '''
        try:
            if testthread:
                print "Currently executing Test's log is: " + main.LogFileName

            else:
                print "Last executed test's log is : " + main.LogFileName

            logFile = main.LogFileName
            logFileHandler = open( logFile, 'r' )
            for msg in logFileHandler.readlines():
                print msg,

            logFileHandler.close()

        except NameError:
            print "There is no previously executed Test"

    def parseArgs( self, args, options ):
        '''
        This will parse the command line arguments.
        '''
        options = self.initOptions( options )
        try:
            index = 0
            while index < len( args ):
                option = args[ index ]
                if index > 0:
                    if re.match( "--params-file", option, flags=0 ):
                        # The more specific match must be before --params
                        options[ 'paramsFile' ] = args[ index + 1 ]
                    elif re.match( "--topo-file", option, flags=0 ):
                        options[ 'topoFile' ] = args[ index + 1 ]
                    elif re.match( "--params", option, flags=0 ):
                        # check if there is a params
                        options[ 'params' ].append( args[ index + 1 ] )
                    elif re.match( "logdir|mail|example|testdir|testcases|onoscell",
                                   option, flags=0 ):
                        options[ option ] = args[ index + 1 ]
                        options = self.testcasesInRange( index + 1, option, args, options )
                    index += 2
                else:
                    options[ 'testname' ] = option
                    index += 1
        except IndexError as e:
            print ( e )
            sys.exit()

        return options

    def initOptions( self, options ):
        '''
        This will initialize the commandline options.
        '''
        options[ 'logdir' ] = None
        options[ 'mail' ] = None
        options[ 'example' ] = None
        options[ 'testdir' ] = None
        options[ 'testcases' ] = None
        options[ 'onoscell' ] = None
        # init params as a empty list
        options[ 'params' ] = []
        options[ 'paramsFile' ] = None
        options[ 'topoFile' ] = None
        return options

    def testcasesInRange( self, index, option, args, options ):
        '''
        This method will handle testcases list,specified in range [1-10].
        '''
        if re.match( "testcases", option, 1 ):
            testcases = []
            args[ index ] = re.sub( "\[|\]", "", args[ index ], 0 )
            m = re.match( "(\d+)\-(\d+)", args[ index ], flags=0 )
            if m:
                start_case = eval( m.group( 1 ) )
                end_case = eval( m.group( 2 ) )
                if ( start_case <= end_case ):
                    i = start_case
                    while i <= end_case:
                        testcases.append( i )
                        i = i + 1
                else:
                    print "Please specify testcases properly like 1-5"
            else:
                options[ option ] = args[ index ]
                return options
            options[ option ] = str( testcases )

        return options

    def cmdloop( self, intro=introduction ):
        print introduction
        while True:
            try:
                super( CLI, self ).cmdloop( intro="" )
                self.postloop()
            except KeyboardInterrupt:
                if testthread:
                    testthread.pause()
                else:
                    print "KeyboardInterrupt, Exiting."
                    sys.exit()

    def do_echo( self, line ):
        '''
        Echoing of given input.
        '''
        output( line )

    def do_sh( self, line ):
        '''
        Run an external shell command
        sh pwd
        sh ifconfig etc.
        '''
        call( line, shell=True )

    def do_py( self, line ):
        '''
        Evaluate a Python expression.

        py main.log.info("Sample Log Information")
        2013-01-07 12:07:26,804 - PoxTest - INFO - Sample Log Information

        '''
        try:
            exec( line )
        except Exception as e:
            output( str( e ) + '\n' )

    def do_interpret( self, line ):
        '''
        interpret will translate the single line openspeak statement to equivalent python script.

        teston> interpret ASSERT result EQUALS main.TRUE ONPASS "Ping executed successfully" ONFAIL "Ping failed"
        utilities.assert_equals(expect=main.TRUE,actual=result,onpass="Ping executed successfully",onfail="Ping failed")

        '''
        from core import openspeak
        ospk = openspeak.OpenSpeak()
        try:
            translated_code = ospk.interpret( text=line )
            print translated_code
        except AttributeError as e:
            print 'Dynamic params are not allowed in single statement translations'

    def do_do( self, line ):
        '''
        Do will translate and execute the openspeak statement for the paused test.
        do <OpenSpeak statement>
        '''
        if testthread:
            from core import openspeak
            ospk = openspeak.OpenSpeak()
            try:
                translated_code = ospk.interpret( text=line )
                eval( translated_code )
            except ( AttributeError, SyntaxError ) as e:
                print 'Dynamic params are not allowed in single statement translations:'
                print e
        else:
            print "Do will translate and execute the openspeak statement for the paused test.\nPlease use interpret to translate the OpenSpeak statement."

    def do_compile( self, line ):
        '''
        compile will translate the openspeak (.ospk) file into TestON test script (python).
        It will receive the openspeak file path as input and will generate
        equivalent test-script file in the same directory.

        usage:
        -----
        teston>compile /home/openflow/TestON/PoxTest.ospk

        Auto-generated test-script file is /home/openflow/TestON/PoxTest.py
        '''
        from core import openspeak
        openspeak = openspeak.OpenSpeak()
        openspeakfile = line
        if os.path.exists( openspeakfile ):
            openspeak.compiler( openspeakfile=openspeakfile, writetofile=1 )
            print "Auto-generated test-script file is " + re.sub( "ospk", "py", openspeakfile, 0 )
        else:
            print 'There is no such file : ' + line

    def do_exit( self, _line ):
        "Exit"
        if testthread:
            testthread.stop()

        sys.exit()

        return 'exited by user command'

    def do_quit( self, line ):
        "Exit"
        return self.do_exit( line )

    def do_EOF( self, line ):
        "Exit"
        output( '\n' )
        return self.do_exit( line )

    def isatty( self ):
        "Is our standard input a tty?"
        return isatty( self.stdin.fileno() )

    def do_source( self, line ):
        '''
        Read shell commands from an input file and execute them sequentially.
        cmdsource.txt :

        "pwd
         ls "

        teston>source /home/openflow/cmdsource.txt
        /home/openflow/TestON/bin/
        cli.py  __init__.py

        '''

        args = line.split()
        if len( args ) != 1:
            error( 'usage: source <file>\n' )
            return
        try:
            self.inputFile = open( args[ 0 ] )
            while True:
                line = self.inputFile.readline()
                if len( line ) > 0:
                    call( line, shell=True )
                else:
                    break
        except IOError:
            error( 'error reading file %s\n' % args[ 0 ] )

    def do_time( self, line ):
        "Measure time taken for any command in TestON."
        start = time.time()
        self.onecmd( line )
        elapsed = time.time() - start
        self.stdout.write( "*** Elapsed time: %0.6f secs\n" % elapsed )

    def default( self, line ):
        """Called on an input line when the command prefix is not recognized."""
        first, args, line = self.parseline( line )
        if not args:
            return
        if args and len( args ) > 0 and args[ -1 ] == '\n':
            args = args[:-1 ]
        rest = args.split( ' ' )

        error( '*** Unknown command: %s\n' % first )

class TestThread( threading.Thread ):
    '''
    TestThread class will handle the test execution and will communicate with the thread in the do_run.
    '''
    def __init__( self, options ):
        self._stopevent = threading.Event()
        threading.Thread.__init__( self )
        self.is_stop = False
        self.options = options
        __builtin__.testthread = self

    def run( self ):
        '''
        Will execute the test.
        '''
        while not self.is_stop:
            if not self._stopevent.isSet():
                self.test_on = TestON( self.options )
                try:
                    if self.test_on.init_result:
                        result = self.test_on.run()
                        if not self.is_stop:
                            result = self.test_on.cleanup()
                        self.is_stop = True
                except KeyboardInterrupt:
                    print "Recevied Interrupt, cleaning-up the logs and drivers before exiting"
                    result = self.test_on.cleanup()
                    self.is_stop = True

        __builtin__.testthread = False

    def pause( self ):
        '''
        Will pause the test.
        '''
        if not cli.pause:
            print "Will pause the test's execution, after completion of this step.....\n\n\n\n"
            cli.pause = True
            self._stopevent.set()
        elif cli.pause and self.is_stop:
            print "KeyboardInterrupt, Exiting."
            self.test_on.exit()
        else:
            print "Recevied Interrupt, cleaning-up the logs and drivers before exiting"
            result = self.test_on.cleanup()
            self.is_stop = True

    def play( self ):
        '''
        Will resume the paused test.
        '''
        self._stopevent.clear()
        cli.pause = False

    def stop( self ):
        '''
        Will stop the test execution.
        '''

        print "Stopping the test"
        self.is_stop = True
        cli.stop = True
        __builtin__.testthread = False

def output( msg ):
    '''
    Simply, print the message in console
    '''
    print msg

def error( msg ):
    '''
    print the error message.
    '''
    print msg

def dictToObj( dictionary ):
    '''
    This will facilitates the converting of the dictionary to the object.
    This method will help to send options as object format to the test.
    '''
    if isinstance( dictionary, list ):
        dictionary = [ dictToObj( x ) for x in dictionary ]
    if not isinstance( dictionary, dict ):
        return dictionary

    class Convert( object ):
        pass
    obj = Convert()
    for k in dictionary:
        obj.__dict__[ k ] = dictToObj( dictionary[ k ] )
    return obj


if __name__ == '__main__':
    if len( sys.argv ) > 1:
        __builtin__.COLORS = True
        CLI( "test" ).onecmd( ' '.join( sys.argv[ 1: ] ) )
    else:
        __builtin__.COLORS = False
        CLI( "test" ).cmdloop()
