blob: f6417a6535ad1c51a0bb318075b6ec568a124315 [file] [log] [blame]
adminbae64d82013-08-01 10:50:15 -07001#!/usr/bin/env python
2'''
3Created on 20-Dec-2012
Jon Hall5f15fef2015-07-17 14:22:14 -07004
adminbae64d82013-08-01 10:50:15 -07005@author: Anil Kumar (anilkumar.s@paxterrasolutions.com)
6
7
8
9 TestON is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation, either version 2 of the License, or
12 (at your option) any later version.
13
14 TestON is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TestON. If not, see <http://www.gnu.org/licenses/>.
21
22
23'''
24
25
26"""
27cli will provide the CLI shell for teston framework.
28
29A simple command-line interface for TestON.
30
31The TestON CLI provides a simple console which
32makes it easy to launch the test. For example, the command run will execute the test.
33
34teston> run test DpctlTest
35Several useful commands are provided.
36"""
37
38from subprocess import call
39from cmd import Cmd
40from os import isatty
41import sys
42import re
43import os
44import time
45import threading
46import __builtin__
47import pprint
48dump = pprint.PrettyPrinter(indent=4)
49__builtin__.testthread = False
50introduction = "TestON is the testing framework \nDeveloped by Paxterra Solutions (www.paxterrasolutions.com)"
Jon Hall0bde9ba2015-03-19 11:32:57 -070051__builtin__.COLORS = False
adminbae64d82013-08-01 10:50:15 -070052
Jon Hall1dd5a0a2015-07-08 10:49:26 -070053path = re.sub( "/bin$", "", sys.path[0] )
54sys.path.insert( 1, path )
Jon Hall0bde9ba2015-03-19 11:32:57 -070055from core.teston import *
adminbae64d82013-08-01 10:50:15 -070056
57class CLI( threading.Thread,Cmd,object ):
58 "command-line interface to execute the test."
59
60 prompt = 'teston> '
61
62 def __init__( self, teston, stdin=sys.stdin ):
63 self.teston = teston
Jon Hall0bde9ba2015-03-19 11:32:57 -070064
adminbae64d82013-08-01 10:50:15 -070065 self._mainevent = threading.Event()
66 threading.Thread.__init__(self)
67 self.main_stop = False
68 self.locals = { 'test': teston }
69 self.stdin = stdin
70 Cmd.__init__( self )
71 self.pause = False
72 self.stop = False
73 __builtin__.cli = self
74
75 def emptyline( self ):
76 "Don't repeat last command when you hit return."
77 pass
78
79 helpStr = (
80 " teston help"
81 )
82
83 def do_help( self, line ):
84 "Describe available CLI commands."
85 Cmd.do_help( self, line )
86 if line is '':
87 output( self.helpStr )
88 def do_run(self,args):
89 '''
90 run command will execute the test with following optional command line arguments
91 logdir <directory to store logs in>
92 testcases <list of testcases separated by comma or range of testcases separated by hypen>
93 mail <mail-id or list of mail-ids seperated by comma>
94 example 1, to execute the examples specified in the ~/examples diretory.
95 '''
96 args = args.split()
97 options = {}
98 options = self.parseArgs(args,options)
99 options = dictToObj(options)
100 if not testthread:
101 test = TestThread(options)
102 test.start()
103 else :
104 print main.TEST+ " test execution paused, please resume that before executing to another test"
105
106 def do_resume(self, line):
107 '''
108 resume command will continue the execution of paused test.
109 teston>resume
110 [2013-01-07 23:03:44.640723] [PoxTest] [STEP] 1.1: Checking the host reachability using pingHost
111 2013-01-07 23:03:44,858 - PoxTest - INFO - Expected Prompt Found
112 ....
113 '''
114 if testthread:
115 testthread.play()
116 else :
117 print "There is no test to resume"
118
119 def do_nextstep(self,line):
120 '''
121 nextstep will execute the next-step of the paused test and
122 it will pause the test after finishing of step.
123
124 teston> nextstep
125 Will pause the test's execution, after completion of this step.....
126
127 teston> [2013-01-07 21:24:26.286601] [PoxTest] [STEP] 1.8: Checking the host reachability using pingHost
128 2013-01-07 21:24:26,455 - PoxTest - INFO - Expected Prompt Found
129 .....
130 teston>
131
132 '''
133 if testthread:
134 main.log.info("Executing the nextstep, Will pause test execution, after completion of the step")
135 testthread.play()
136 time.sleep(.1)
137 testthread.pause()
138 else:
139 print "There is no paused test "
140
141 def do_dumpvar(self,line):
142 '''
143 dumpvar will print all the test data in raw format.
144 usgae :
145 teston>dumpvar main
146 Here 'main' will be the test object.
147
148 teston>dumpvar params
149 here 'params' will be the parameters specified in the params file.
150
151 teston>dumpvar topology
152 here 'topology' will be topology specification of the test specified in topo file.
153 '''
154 if testthread:
155 if line == "main":
156 dump.pprint(vars(main))
157 else :
158 try :
159 dump.pprint(vars(main)[line])
160 except KeyError,e:
161 print e
162 else :
163 print "There is no paused test "
164
165 def do_currentcase(self,line):
166 '''
167 currentcase will return the current case in the test execution.
168
169 teston>currentcase
170 Currently executing test case is: 2
171
172 '''
173 if testthread:
174 print "Currently executing test case is: "+str(main.CurrentTestCaseNumber)
175 else :
176 print "There is no paused test "
177
178
179 def do_currentstep(self,line):
180 '''
181 currentstep will return the current step in the test execution.
182
183 teston>currentstep
184 Currently executing test step is: 2.3
185 '''
186 if testthread:
187 print "Currently executing test step is: "+str(main.CurrentTestCaseNumber)+'.'+str(main.stepCount)
188 else :
189 print "There is no paused test "
190
191
192 def do_stop(self,line):
193 '''
194 Will stop the paused test, if any !
195 '''
196 if testthread:
197 testthread.stop()
198
199 return 'exited by user command'
200
201 def do_gettest(self,line):
202 '''
203 gettest will return the test name which is under execution or recently executed.
204
205 Test under execution:
206 teston>gettest
207 Currently executing Test is: PoxTest
208
209 Test recently executed:
210 Recently executed test is: MininetTest
211 '''
212 try :
213 if testthread :
214 print "Currently executing Test is: "+main.TEST
215 else :
216 print "Recently executed test is: "+main.TEST
217
218 except NameError:
219 print "There is no previously executed Test"
220
221 def do_showlog(self,line):
222 '''
223 showlog will show the test's Log
224 teston>showlog
225 Last executed test's log is : //home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_42_11/PoxTest_07_Jan_2013_21_42_11.log
226 .....
227 teston>showlog
228 Currently executing Test's log is: /home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_46_58/PoxTest_07_Jan_2013_21_46_58.log
229 .....
230 '''
231 try :
232 if testthread :
233 print "Currently executing Test's log is: "+main.LogFileName
234
235 else :
236 print "Last executed test's log is : "+main.LogFileName
237
238 logFile = main.LogFileName
239 logFileHandler = open(logFile, 'r')
240 for msg in logFileHandler.readlines() :
241 print msg,
242
243 logFileHandler.close()
244
245 except NameError:
246 print "There is no previously executed Test"
247
248
249
250 def parseArgs(self,args,options):
251 '''
252 This will parse the command line arguments.
253 '''
254 options = self.initOptions(options)
255 try :
256 for index, option in enumerate(args):
257 if index > 0 :
Hari Krishna03f530e2015-07-10 17:28:27 -0700258 if re.match("logdir|mail|example|testdir|testcases|onoscell", option, flags = 0):
adminbae64d82013-08-01 10:50:15 -0700259 index = index+1
260 options[option] = args[index]
261 options = self.testcasesInRange(index,option,args,options)
262 else :
263 options['testname'] = option
264 except IndexError,e:
265 print e
266
267 return options
268
269 def initOptions(self,options):
270 '''
271 This will initialize the commandline options.
272 '''
273 options['logdir'] = None
274 options['mail'] = None
275 options['example'] = None
276 options['testdir'] = None
277 options['testcases'] = None
Hari Krishna03f530e2015-07-10 17:28:27 -0700278 options['onoscell'] = None
adminbae64d82013-08-01 10:50:15 -0700279 return options
280
281 def testcasesInRange(self,index,option,args,options):
282 '''
283 This method will handle testcases list,specified in range [1-10].
284 '''
285 if re.match("testcases",option,1):
286 testcases = []
287 args[index] = re.sub("\[|\]","",args[index],0)
288 m = re.match("(\d+)\-(\d+)",args[index],flags=0)
289 if m:
290 start_case = eval(m.group(1))
291 end_case = eval(m.group(2))
292 if (start_case <= end_case):
293 i = start_case
294 while i <= end_case:
295 testcases.append(i)
296 i= i+1
297 else :
298 print "Please specify testcases properly like 1-5"
299 else :
300 options[option] = args[index]
301 return options
302 options[option] = str(testcases)
303
304 return options
305
306 def cmdloop(self, intro=introduction):
307 print introduction
308 while True:
309 try:
310 super(CLI, self).cmdloop(intro="")
311 self.postloop()
312 except KeyboardInterrupt:
313 testthread.pause()
314
315 def do_echo( self, line ):
316 '''
317 Echoing of given input.
318 '''
319 output(line)
320
321 def do_sh( self, line ):
322 '''
323 Run an external shell command
324 sh pwd
325 sh ifconfig etc.
326 '''
327 call( line, shell=True )
328
329
330 def do_py( self, line ):
331 '''
332 Evaluate a Python expression.
333
334 py main.log.info("Sample Log Information")
335 2013-01-07 12:07:26,804 - PoxTest - INFO - Sample Log Information
336
337 '''
338 try:
339 exec( line )
340 except Exception, e:
341 output( str( e ) + '\n' )
342
343 def do_interpret(self,line):
344 '''
345 interpret will translate the single line openspeak statement to equivalent python script.
346
347 teston> interpret ASSERT result EQUALS main.TRUE ONPASS "Ping executed successfully" ONFAIL "Ping failed"
348 utilities.assert_equals(expect=main.TRUE,actual=result,onpass="Ping executed successfully",onfail="Ping failed")
349
350 '''
351 from core import openspeak
352 ospk = openspeak.OpenSpeak()
353 try :
354 translated_code = ospk.interpret(text=line)
355 print translated_code
356 except AttributeError, e:
357 print 'Dynamic params are not allowed in single statement translations'
358
359 def do_do (self,line):
360 '''
361 Do will translate and execute the openspeak statement for the paused test.
362 do <OpenSpeak statement>
363 '''
364 if testthread:
365 from core import openspeak
366 ospk = openspeak.OpenSpeak()
367 try :
368 translated_code = ospk.interpret(text=line)
369 eval(translated_code)
370 except (AttributeError,SyntaxError), e:
371 print 'Dynamic params are not allowed in single statement translations'
372 else :
373 print "Do will translate and execute the openspeak statement for the paused test.\nPlease use interpret to translate the OpenSpeak statement."
374
375 def do_compile(self,line):
376 '''
377 compile will translate the openspeak (.ospk) file into TestON test script (python).
378 It will receive the openspeak file path as input and will generate
379 equivalent test-script file in the same directory.
380
381 usage:
382 -----
383 teston>compile /home/openflow/TestON/PoxTest.ospk
384
385 Auto-generated test-script file is /home/openflow/TestON/PoxTest.py
386 '''
387 from core import openspeak
388 openspeak = openspeak.OpenSpeak()
389 openspeakfile = line
390 if os.path.exists(openspeakfile) :
391 openspeak.compiler(openspeakfile=openspeakfile,writetofile=1)
392 print "Auto-generated test-script file is "+ re.sub("ospk","py",openspeakfile,0)
393 else:
394 print 'There is no such file : '+line
395
396 def do_exit( self, _line ):
397 "Exit"
398 if testthread:
399 testthread.stop()
400
401 sys.exit()
402
403 return 'exited by user command'
404
405 def do_quit( self, line ):
406 "Exit"
407 return self.do_exit( line )
408
409 def do_EOF( self, line ):
410 "Exit"
411 output( '\n' )
412 return self.do_exit( line )
413
414 def isatty( self ):
415 "Is our standard input a tty?"
416 return isatty( self.stdin.fileno() )
417
418 def do_source( self, line ):
419 '''
420 Read shell commands from an input file and execute them sequentially.
421 cmdsource.txt :
422
423 "pwd
424 ls "
425
426 teston>source /home/openflow/cmdsource.txt
427 /home/openflow/TestON/bin/
428 cli.py __init__.py
429
430 '''
431
432 args = line.split()
433 if len(args) != 1:
434 error( 'usage: source <file>\n' )
435 return
436 try:
437 self.inputFile = open( args[ 0 ] )
438 while True:
439 line = self.inputFile.readline()
440 if len( line ) > 0:
441 call( line, shell=True )
442 else:
443 break
444 except IOError:
445 error( 'error reading file %s\n' % args[ 0 ] )
446
447 def do_updatedriver(self,line):
448 '''
449 updatedriver will update the given driver name which exists into mentioned config file.
450 It will receive two optional arguments :
451
452 1. Config File Path
453 2. Drivers List to be updated.
454
455 Default : config file = "~/TestON/config/updatedriver" ,
456 Driver List = all drivers specified in config file .
457 '''
458 args = line.split()
459 config = ''
460 drivers = ''
461 try :
462 for index, option in enumerate(args):
463 if option == 'config':
464 index = index + 1
465 config = args[index]
466 elif option == 'drivers' :
467 index = index + 1
468 drivers = args[index]
469 except IndexError:
470 pass
471 import updatedriver
472 converter = updatedriver.UpdateDriver()
473
474 if config == '':
475 path = re.sub("(bin)$", "", os.getcwd())
476 config = path + "/config/updatedriver.cfg"
477 configDict = converter.configparser(config)
478
479 else :
480 converter.configparser(config)
481 configDict = converter.configparser(config)
482
483
484 converter.writeDriver(drivers)
485
486
487
488
489 def do_time( self, line ):
490 "Measure time taken for any command in TestON."
491 start = time.time()
492 self.onecmd(line)
493 elapsed = time.time() - start
494 self.stdout.write("*** Elapsed time: %0.6f secs\n" % elapsed)
495
496 def default( self, line ):
497 """Called on an input line when the command prefix is not recognized."""
498 first, args, line = self.parseline( line )
499 if not args:
500 return
501 if args and len(args) > 0 and args[ -1 ] == '\n':
502 args = args[ :-1 ]
503 rest = args.split( ' ' )
504
505 error( '*** Unknown command: %s\n' % first )
506
507
508
509class TestThread(threading.Thread):
510 '''
511 TestThread class will handle the test execution and will communicate with the thread in the do_run.
512 '''
513 def __init__(self,options):
514 self._stopevent = threading.Event()
515 threading.Thread.__init__(self)
516 self.is_stop = False
517 self.options = options
518 __builtin__.testthread = self
519
520 def run(self):
521 '''
522 Will execute the test.
523 '''
524 while not self.is_stop :
525 if not self._stopevent.isSet():
526 self.test_on = TestON(self.options)
527 try :
528 if self.test_on.init_result:
529 result = self.test_on.run()
530 if not self.is_stop :
531 result = self.test_on.cleanup()
532 self.is_stop = True
533 except(KeyboardInterrupt):
534 print "Recevied Interrupt,cleaning-up the logs and drivers before exiting"
535 result = self.test_on.cleanup()
536 self.is_stop = True
537
538 __builtin__.testthread = False
539
540 def pause(self):
541 '''
542 Will pause the test.
543 '''
544 print "Will pause the test's execution, after completion of this step.....\n\n\n\n"
545 cli.pause = True
546 self._stopevent.set()
547
548 def play(self):
549 '''
550 Will resume the paused test.
551 '''
552 self._stopevent.clear()
553 cli.pause = False
554
555 def stop(self):
556 '''
557 Will stop the test execution.
558 '''
559
560 print "Stopping the test"
561 self.is_stop = True
562 cli.stop = True
563 __builtin__.testthread = False
564
565def output(msg):
566 '''
567 Simply, print the message in console
568 '''
569 print msg
570
571def error(msg):
572 '''
573 print the error message.
574 '''
575 print msg
576
577def dictToObj(dictionary):
578 '''
579 This will facilitates the converting of the dictionary to the object.
580 This method will help to send options as object format to the test.
581 '''
582 if isinstance(dictionary, list):
583 dictionary = [dictToObj(x) for x in dictionary]
584 if not isinstance(dictionary, dict):
585 return dictionary
586 class Convert(object):
587 pass
588 obj = Convert()
589 for k in dictionary:
590 obj.__dict__[k] = dictToObj(dictionary[k])
591 return obj
592
593
594if __name__ == '__main__':
595 if len(sys.argv) > 1:
Jon Hall0bde9ba2015-03-19 11:32:57 -0700596 __builtin__.COLORS = True
adminbae64d82013-08-01 10:50:15 -0700597 CLI("test").onecmd(' '.join(sys.argv[1:]))
598 else:
Jon Hall0bde9ba2015-03-19 11:32:57 -0700599 __builtin__.COLORS = False
adminbae64d82013-08-01 10:50:15 -0700600 CLI("test").cmdloop()