blob: ed1c7963feee2cce905b64655bc37669b3309730 [file] [log] [blame]
adminbae64d82013-08-01 10:50:15 -07001#!/usr/bin/env python
2'''
3Created on 20-Dec-2012
4
5@author: Anil Kumar (anilkumar.s@paxterrasolutions.com)
6
7
8
9 TestON is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation, either version 2 of the License, or
12 (at your option) any later version.
13
14 TestON is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TestON. If not, see <http://www.gnu.org/licenses/>.
21
22
23'''
24
25
26"""
27cli will provide the CLI shell for teston framework.
28
29A simple command-line interface for TestON.
30
31The TestON CLI provides a simple console which
32makes it easy to launch the test. For example, the command run will execute the test.
33
34teston> run test DpctlTest
35Several useful commands are provided.
36"""
37
38from subprocess import call
39from cmd import Cmd
40from os import isatty
41import sys
42import re
43import os
44import time
45import threading
46import __builtin__
47import pprint
48dump = pprint.PrettyPrinter(indent=4)
49__builtin__.testthread = False
50introduction = "TestON is the testing framework \nDeveloped by Paxterra Solutions (www.paxterrasolutions.com)"
Jon Hall0bde9ba2015-03-19 11:32:57 -070051__builtin__.COLORS = False
adminbae64d82013-08-01 10:50:15 -070052
53path = re.sub("teston$", "", os.getcwd())
54sys.path.append(path+"/Core")
55sys.path.append("../")
Jon Hall0bde9ba2015-03-19 11:32:57 -070056from core.teston import *
adminbae64d82013-08-01 10:50:15 -070057
58class CLI( threading.Thread,Cmd,object ):
59 "command-line interface to execute the test."
60
61 prompt = 'teston> '
62
63 def __init__( self, teston, stdin=sys.stdin ):
64 self.teston = teston
Jon Hall0bde9ba2015-03-19 11:32:57 -070065
adminbae64d82013-08-01 10:50:15 -070066 self._mainevent = threading.Event()
67 threading.Thread.__init__(self)
68 self.main_stop = False
69 self.locals = { 'test': teston }
70 self.stdin = stdin
71 Cmd.__init__( self )
72 self.pause = False
73 self.stop = False
74 __builtin__.cli = self
75
76 def emptyline( self ):
77 "Don't repeat last command when you hit return."
78 pass
79
80 helpStr = (
81 " teston help"
82 )
83
84 def do_help( self, line ):
85 "Describe available CLI commands."
86 Cmd.do_help( self, line )
87 if line is '':
88 output( self.helpStr )
89 def do_run(self,args):
90 '''
91 run command will execute the test with following optional command line arguments
92 logdir <directory to store logs in>
93 testcases <list of testcases separated by comma or range of testcases separated by hypen>
94 mail <mail-id or list of mail-ids seperated by comma>
95 example 1, to execute the examples specified in the ~/examples diretory.
96 '''
97 args = args.split()
98 options = {}
99 options = self.parseArgs(args,options)
100 options = dictToObj(options)
101 if not testthread:
102 test = TestThread(options)
103 test.start()
104 else :
105 print main.TEST+ " test execution paused, please resume that before executing to another test"
106
107 def do_resume(self, line):
108 '''
109 resume command will continue the execution of paused test.
110 teston>resume
111 [2013-01-07 23:03:44.640723] [PoxTest] [STEP] 1.1: Checking the host reachability using pingHost
112 2013-01-07 23:03:44,858 - PoxTest - INFO - Expected Prompt Found
113 ....
114 '''
115 if testthread:
116 testthread.play()
117 else :
118 print "There is no test to resume"
119
120 def do_nextstep(self,line):
121 '''
122 nextstep will execute the next-step of the paused test and
123 it will pause the test after finishing of step.
124
125 teston> nextstep
126 Will pause the test's execution, after completion of this step.....
127
128 teston> [2013-01-07 21:24:26.286601] [PoxTest] [STEP] 1.8: Checking the host reachability using pingHost
129 2013-01-07 21:24:26,455 - PoxTest - INFO - Expected Prompt Found
130 .....
131 teston>
132
133 '''
134 if testthread:
135 main.log.info("Executing the nextstep, Will pause test execution, after completion of the step")
136 testthread.play()
137 time.sleep(.1)
138 testthread.pause()
139 else:
140 print "There is no paused test "
141
142 def do_dumpvar(self,line):
143 '''
144 dumpvar will print all the test data in raw format.
145 usgae :
146 teston>dumpvar main
147 Here 'main' will be the test object.
148
149 teston>dumpvar params
150 here 'params' will be the parameters specified in the params file.
151
152 teston>dumpvar topology
153 here 'topology' will be topology specification of the test specified in topo file.
154 '''
155 if testthread:
156 if line == "main":
157 dump.pprint(vars(main))
158 else :
159 try :
160 dump.pprint(vars(main)[line])
161 except KeyError,e:
162 print e
163 else :
164 print "There is no paused test "
165
166 def do_currentcase(self,line):
167 '''
168 currentcase will return the current case in the test execution.
169
170 teston>currentcase
171 Currently executing test case is: 2
172
173 '''
174 if testthread:
175 print "Currently executing test case is: "+str(main.CurrentTestCaseNumber)
176 else :
177 print "There is no paused test "
178
179
180 def do_currentstep(self,line):
181 '''
182 currentstep will return the current step in the test execution.
183
184 teston>currentstep
185 Currently executing test step is: 2.3
186 '''
187 if testthread:
188 print "Currently executing test step is: "+str(main.CurrentTestCaseNumber)+'.'+str(main.stepCount)
189 else :
190 print "There is no paused test "
191
192
193 def do_stop(self,line):
194 '''
195 Will stop the paused test, if any !
196 '''
197 if testthread:
198 testthread.stop()
199
200 return 'exited by user command'
201
202 def do_gettest(self,line):
203 '''
204 gettest will return the test name which is under execution or recently executed.
205
206 Test under execution:
207 teston>gettest
208 Currently executing Test is: PoxTest
209
210 Test recently executed:
211 Recently executed test is: MininetTest
212 '''
213 try :
214 if testthread :
215 print "Currently executing Test is: "+main.TEST
216 else :
217 print "Recently executed test is: "+main.TEST
218
219 except NameError:
220 print "There is no previously executed Test"
221
222 def do_showlog(self,line):
223 '''
224 showlog will show the test's Log
225 teston>showlog
226 Last executed test's log is : //home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_42_11/PoxTest_07_Jan_2013_21_42_11.log
227 .....
228 teston>showlog
229 Currently executing Test's log is: /home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_46_58/PoxTest_07_Jan_2013_21_46_58.log
230 .....
231 '''
232 try :
233 if testthread :
234 print "Currently executing Test's log is: "+main.LogFileName
235
236 else :
237 print "Last executed test's log is : "+main.LogFileName
238
239 logFile = main.LogFileName
240 logFileHandler = open(logFile, 'r')
241 for msg in logFileHandler.readlines() :
242 print msg,
243
244 logFileHandler.close()
245
246 except NameError:
247 print "There is no previously executed Test"
248
249
250
251 def parseArgs(self,args,options):
252 '''
253 This will parse the command line arguments.
254 '''
255 options = self.initOptions(options)
256 try :
257 for index, option in enumerate(args):
258 if index > 0 :
259 if re.match("logdir|mail|example|testdir|testcases", option, flags = 0):
260 index = index+1
261 options[option] = args[index]
262 options = self.testcasesInRange(index,option,args,options)
263 else :
264 options['testname'] = option
265 except IndexError,e:
266 print e
267
268 return options
269
270 def initOptions(self,options):
271 '''
272 This will initialize the commandline options.
273 '''
274 options['logdir'] = None
275 options['mail'] = None
276 options['example'] = None
277 options['testdir'] = None
278 options['testcases'] = None
279 return options
280
281 def testcasesInRange(self,index,option,args,options):
282 '''
283 This method will handle testcases list,specified in range [1-10].
284 '''
285 if re.match("testcases",option,1):
286 testcases = []
287 args[index] = re.sub("\[|\]","",args[index],0)
288 m = re.match("(\d+)\-(\d+)",args[index],flags=0)
289 if m:
290 start_case = eval(m.group(1))
291 end_case = eval(m.group(2))
292 if (start_case <= end_case):
293 i = start_case
294 while i <= end_case:
295 testcases.append(i)
296 i= i+1
297 else :
298 print "Please specify testcases properly like 1-5"
299 else :
300 options[option] = args[index]
301 return options
302 options[option] = str(testcases)
303
304 return options
305
306 def cmdloop(self, intro=introduction):
307 print introduction
308 while True:
309 try:
310 super(CLI, self).cmdloop(intro="")
311 self.postloop()
312 except KeyboardInterrupt:
313 testthread.pause()
314
315 def do_echo( self, line ):
316 '''
317 Echoing of given input.
318 '''
319 output(line)
320
321 def do_sh( self, line ):
322 '''
323 Run an external shell command
324 sh pwd
325 sh ifconfig etc.
326 '''
327 call( line, shell=True )
328
329
330 def do_py( self, line ):
331 '''
332 Evaluate a Python expression.
333
334 py main.log.info("Sample Log Information")
335 2013-01-07 12:07:26,804 - PoxTest - INFO - Sample Log Information
336
337 '''
338 try:
339 exec( line )
340 except Exception, e:
341 output( str( e ) + '\n' )
342
343 def do_interpret(self,line):
344 '''
345 interpret will translate the single line openspeak statement to equivalent python script.
346
347 teston> interpret ASSERT result EQUALS main.TRUE ONPASS "Ping executed successfully" ONFAIL "Ping failed"
348 utilities.assert_equals(expect=main.TRUE,actual=result,onpass="Ping executed successfully",onfail="Ping failed")
349
350 '''
351 from core import openspeak
352 ospk = openspeak.OpenSpeak()
353 try :
354 translated_code = ospk.interpret(text=line)
355 print translated_code
356 except AttributeError, e:
357 print 'Dynamic params are not allowed in single statement translations'
358
359 def do_do (self,line):
360 '''
361 Do will translate and execute the openspeak statement for the paused test.
362 do <OpenSpeak statement>
363 '''
364 if testthread:
365 from core import openspeak
366 ospk = openspeak.OpenSpeak()
367 try :
368 translated_code = ospk.interpret(text=line)
369 eval(translated_code)
370 except (AttributeError,SyntaxError), e:
371 print 'Dynamic params are not allowed in single statement translations'
372 else :
373 print "Do will translate and execute the openspeak statement for the paused test.\nPlease use interpret to translate the OpenSpeak statement."
374
375 def do_compile(self,line):
376 '''
377 compile will translate the openspeak (.ospk) file into TestON test script (python).
378 It will receive the openspeak file path as input and will generate
379 equivalent test-script file in the same directory.
380
381 usage:
382 -----
383 teston>compile /home/openflow/TestON/PoxTest.ospk
384
385 Auto-generated test-script file is /home/openflow/TestON/PoxTest.py
386 '''
387 from core import openspeak
388 openspeak = openspeak.OpenSpeak()
389 openspeakfile = line
390 if os.path.exists(openspeakfile) :
391 openspeak.compiler(openspeakfile=openspeakfile,writetofile=1)
392 print "Auto-generated test-script file is "+ re.sub("ospk","py",openspeakfile,0)
393 else:
394 print 'There is no such file : '+line
395
396 def do_exit( self, _line ):
397 "Exit"
398 if testthread:
399 testthread.stop()
400
401 sys.exit()
402
403 return 'exited by user command'
404
405 def do_quit( self, line ):
406 "Exit"
407 return self.do_exit( line )
408
409 def do_EOF( self, line ):
410 "Exit"
411 output( '\n' )
412 return self.do_exit( line )
413
414 def isatty( self ):
415 "Is our standard input a tty?"
416 return isatty( self.stdin.fileno() )
417
418 def do_source( self, line ):
419 '''
420 Read shell commands from an input file and execute them sequentially.
421 cmdsource.txt :
422
423 "pwd
424 ls "
425
426 teston>source /home/openflow/cmdsource.txt
427 /home/openflow/TestON/bin/
428 cli.py __init__.py
429
430 '''
431
432 args = line.split()
433 if len(args) != 1:
434 error( 'usage: source <file>\n' )
435 return
436 try:
437 self.inputFile = open( args[ 0 ] )
438 while True:
439 line = self.inputFile.readline()
440 if len( line ) > 0:
441 call( line, shell=True )
442 else:
443 break
444 except IOError:
445 error( 'error reading file %s\n' % args[ 0 ] )
446
447 def do_updatedriver(self,line):
448 '''
449 updatedriver will update the given driver name which exists into mentioned config file.
450 It will receive two optional arguments :
451
452 1. Config File Path
453 2. Drivers List to be updated.
454
455 Default : config file = "~/TestON/config/updatedriver" ,
456 Driver List = all drivers specified in config file .
457 '''
458 args = line.split()
459 config = ''
460 drivers = ''
461 try :
462 for index, option in enumerate(args):
463 if option == 'config':
464 index = index + 1
465 config = args[index]
466 elif option == 'drivers' :
467 index = index + 1
468 drivers = args[index]
469 except IndexError:
470 pass
471 import updatedriver
472 converter = updatedriver.UpdateDriver()
473
474 if config == '':
475 path = re.sub("(bin)$", "", os.getcwd())
476 config = path + "/config/updatedriver.cfg"
477 configDict = converter.configparser(config)
478
479 else :
480 converter.configparser(config)
481 configDict = converter.configparser(config)
482
483
484 converter.writeDriver(drivers)
485
486
487
488
489 def do_time( self, line ):
490 "Measure time taken for any command in TestON."
491 start = time.time()
492 self.onecmd(line)
493 elapsed = time.time() - start
494 self.stdout.write("*** Elapsed time: %0.6f secs\n" % elapsed)
495
496 def default( self, line ):
497 """Called on an input line when the command prefix is not recognized."""
498 first, args, line = self.parseline( line )
499 if not args:
500 return
501 if args and len(args) > 0 and args[ -1 ] == '\n':
502 args = args[ :-1 ]
503 rest = args.split( ' ' )
504
505 error( '*** Unknown command: %s\n' % first )
506
507
508
509class TestThread(threading.Thread):
510 '''
511 TestThread class will handle the test execution and will communicate with the thread in the do_run.
512 '''
513 def __init__(self,options):
514 self._stopevent = threading.Event()
515 threading.Thread.__init__(self)
516 self.is_stop = False
517 self.options = options
518 __builtin__.testthread = self
519
520 def run(self):
521 '''
522 Will execute the test.
523 '''
524 while not self.is_stop :
525 if not self._stopevent.isSet():
526 self.test_on = TestON(self.options)
527 try :
528 if self.test_on.init_result:
529 result = self.test_on.run()
530 if not self.is_stop :
531 result = self.test_on.cleanup()
532 self.is_stop = True
533 except(KeyboardInterrupt):
534 print "Recevied Interrupt,cleaning-up the logs and drivers before exiting"
535 result = self.test_on.cleanup()
536 self.is_stop = True
537
538 __builtin__.testthread = False
539
540 def pause(self):
541 '''
542 Will pause the test.
543 '''
544 print "Will pause the test's execution, after completion of this step.....\n\n\n\n"
545 cli.pause = True
546 self._stopevent.set()
547
548 def play(self):
549 '''
550 Will resume the paused test.
551 '''
552 self._stopevent.clear()
553 cli.pause = False
554
555 def stop(self):
556 '''
557 Will stop the test execution.
558 '''
559
560 print "Stopping the test"
561 self.is_stop = True
562 cli.stop = True
563 __builtin__.testthread = False
564
565def output(msg):
566 '''
567 Simply, print the message in console
568 '''
569 print msg
570
571def error(msg):
572 '''
573 print the error message.
574 '''
575 print msg
576
577def dictToObj(dictionary):
578 '''
579 This will facilitates the converting of the dictionary to the object.
580 This method will help to send options as object format to the test.
581 '''
582 if isinstance(dictionary, list):
583 dictionary = [dictToObj(x) for x in dictionary]
584 if not isinstance(dictionary, dict):
585 return dictionary
586 class Convert(object):
587 pass
588 obj = Convert()
589 for k in dictionary:
590 obj.__dict__[k] = dictToObj(dictionary[k])
591 return obj
592
593
594if __name__ == '__main__':
595 if len(sys.argv) > 1:
Jon Hall0bde9ba2015-03-19 11:32:57 -0700596 __builtin__.COLORS = True
adminbae64d82013-08-01 10:50:15 -0700597 CLI("test").onecmd(' '.join(sys.argv[1:]))
598 else:
Jon Hall0bde9ba2015-03-19 11:32:57 -0700599 __builtin__.COLORS = False
adminbae64d82013-08-01 10:50:15 -0700600 CLI("test").cmdloop()