blob: 47b027e59fbb06e4b4e76298302ec84ba1c81490 [file] [log] [blame]
adminbae64d82013-08-01 10:50:15 -07001#!/usr/bin/env python
2'''
3Created on 20-Dec-2012
4
5@author: Anil Kumar (anilkumar.s@paxterrasolutions.com)
6
7
8
9 TestON is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation, either version 2 of the License, or
12 (at your option) any later version.
13
14 TestON is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TestON. If not, see <http://www.gnu.org/licenses/>.
21
22
23'''
24
25
26"""
27cli will provide the CLI shell for teston framework.
28
29A simple command-line interface for TestON.
30
31The TestON CLI provides a simple console which
32makes it easy to launch the test. For example, the command run will execute the test.
33
34teston> run test DpctlTest
35Several useful commands are provided.
36"""
37
38from subprocess import call
39from cmd import Cmd
40from os import isatty
41import sys
42import re
43import os
44import time
45import threading
46import __builtin__
47import pprint
48dump = pprint.PrettyPrinter(indent=4)
49__builtin__.testthread = False
50introduction = "TestON is the testing framework \nDeveloped by Paxterra Solutions (www.paxterrasolutions.com)"
51
52path = re.sub("teston$", "", os.getcwd())
53sys.path.append(path+"/Core")
54sys.path.append("../")
55from core.teston import *
56
57class CLI( threading.Thread,Cmd,object ):
58 "command-line interface to execute the test."
59
60 prompt = 'teston> '
61
62 def __init__( self, teston, stdin=sys.stdin ):
63 self.teston = teston
64
65 self._mainevent = threading.Event()
66 threading.Thread.__init__(self)
67 self.main_stop = False
68 self.locals = { 'test': teston }
69 self.stdin = stdin
70 Cmd.__init__( self )
71 self.pause = False
72 self.stop = False
73 __builtin__.cli = self
74
75 def emptyline( self ):
76 "Don't repeat last command when you hit return."
77 pass
78
79 helpStr = (
80 " teston help"
81 )
82
83 def do_help( self, line ):
84 "Describe available CLI commands."
85 Cmd.do_help( self, line )
86 if line is '':
87 output( self.helpStr )
88 def do_run(self,args):
89 '''
90 run command will execute the test with following optional command line arguments
91 logdir <directory to store logs in>
92 testcases <list of testcases separated by comma or range of testcases separated by hypen>
93 mail <mail-id or list of mail-ids seperated by comma>
94 example 1, to execute the examples specified in the ~/examples diretory.
95 '''
96 args = args.split()
97 options = {}
98 options = self.parseArgs(args,options)
99 options = dictToObj(options)
100 if not testthread:
101 test = TestThread(options)
102 test.start()
103 else :
104 print main.TEST+ " test execution paused, please resume that before executing to another test"
105
106 def do_resume(self, line):
107 '''
108 resume command will continue the execution of paused test.
109 teston>resume
110 [2013-01-07 23:03:44.640723] [PoxTest] [STEP] 1.1: Checking the host reachability using pingHost
111 2013-01-07 23:03:44,858 - PoxTest - INFO - Expected Prompt Found
112 ....
113 '''
114 if testthread:
115 testthread.play()
116 else :
117 print "There is no test to resume"
118
119 def do_nextstep(self,line):
120 '''
121 nextstep will execute the next-step of the paused test and
122 it will pause the test after finishing of step.
123
124 teston> nextstep
125 Will pause the test's execution, after completion of this step.....
126
127 teston> [2013-01-07 21:24:26.286601] [PoxTest] [STEP] 1.8: Checking the host reachability using pingHost
128 2013-01-07 21:24:26,455 - PoxTest - INFO - Expected Prompt Found
129 .....
130 teston>
131
132 '''
133 if testthread:
134 main.log.info("Executing the nextstep, Will pause test execution, after completion of the step")
135 testthread.play()
136 time.sleep(.1)
137 testthread.pause()
138 else:
139 print "There is no paused test "
140
141 def do_dumpvar(self,line):
142 '''
143 dumpvar will print all the test data in raw format.
144 usgae :
145 teston>dumpvar main
146 Here 'main' will be the test object.
147
148 teston>dumpvar params
149 here 'params' will be the parameters specified in the params file.
150
151 teston>dumpvar topology
152 here 'topology' will be topology specification of the test specified in topo file.
153 '''
154 if testthread:
155 if line == "main":
156 dump.pprint(vars(main))
157 else :
158 try :
159 dump.pprint(vars(main)[line])
160 except KeyError,e:
161 print e
162 else :
163 print "There is no paused test "
164
165 def do_currentcase(self,line):
166 '''
167 currentcase will return the current case in the test execution.
168
169 teston>currentcase
170 Currently executing test case is: 2
171
172 '''
173 if testthread:
174 print "Currently executing test case is: "+str(main.CurrentTestCaseNumber)
175 else :
176 print "There is no paused test "
177
178
179 def do_currentstep(self,line):
180 '''
181 currentstep will return the current step in the test execution.
182
183 teston>currentstep
184 Currently executing test step is: 2.3
185 '''
186 if testthread:
187 print "Currently executing test step is: "+str(main.CurrentTestCaseNumber)+'.'+str(main.stepCount)
188 else :
189 print "There is no paused test "
190
191
192 def do_stop(self,line):
193 '''
194 Will stop the paused test, if any !
195 '''
196 if testthread:
197 testthread.stop()
198
199 return 'exited by user command'
200
201 def do_gettest(self,line):
202 '''
203 gettest will return the test name which is under execution or recently executed.
204
205 Test under execution:
206 teston>gettest
207 Currently executing Test is: PoxTest
208
209 Test recently executed:
210 Recently executed test is: MininetTest
211 '''
212 try :
213 if testthread :
214 print "Currently executing Test is: "+main.TEST
215 else :
216 print "Recently executed test is: "+main.TEST
217
218 except NameError:
219 print "There is no previously executed Test"
220
221 def do_showlog(self,line):
222 '''
223 showlog will show the test's Log
224 teston>showlog
225 Last executed test's log is : //home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_42_11/PoxTest_07_Jan_2013_21_42_11.log
226 .....
227 teston>showlog
228 Currently executing Test's log is: /home/openflow/TestON/logs/PoxTest_07_Jan_2013_21_46_58/PoxTest_07_Jan_2013_21_46_58.log
229 .....
230 '''
231 try :
232 if testthread :
233 print "Currently executing Test's log is: "+main.LogFileName
234
235 else :
236 print "Last executed test's log is : "+main.LogFileName
237
238 logFile = main.LogFileName
239 logFileHandler = open(logFile, 'r')
240 for msg in logFileHandler.readlines() :
241 print msg,
242
243 logFileHandler.close()
244
245 except NameError:
246 print "There is no previously executed Test"
247
248
249
250 def parseArgs(self,args,options):
251 '''
252 This will parse the command line arguments.
253 '''
254 options = self.initOptions(options)
255 try :
256 for index, option in enumerate(args):
257 if index > 0 :
258 if re.match("logdir|mail|example|testdir|testcases", option, flags = 0):
259 index = index+1
260 options[option] = args[index]
261 options = self.testcasesInRange(index,option,args,options)
262 else :
263 options['testname'] = option
264 except IndexError,e:
265 print e
266
267 return options
268
269 def initOptions(self,options):
270 '''
271 This will initialize the commandline options.
272 '''
273 options['logdir'] = None
274 options['mail'] = None
275 options['example'] = None
276 options['testdir'] = None
277 options['testcases'] = None
278 return options
279
280 def testcasesInRange(self,index,option,args,options):
281 '''
282 This method will handle testcases list,specified in range [1-10].
283 '''
284 if re.match("testcases",option,1):
285 testcases = []
286 args[index] = re.sub("\[|\]","",args[index],0)
287 m = re.match("(\d+)\-(\d+)",args[index],flags=0)
288 if m:
289 start_case = eval(m.group(1))
290 end_case = eval(m.group(2))
291 if (start_case <= end_case):
292 i = start_case
293 while i <= end_case:
294 testcases.append(i)
295 i= i+1
296 else :
297 print "Please specify testcases properly like 1-5"
298 else :
299 options[option] = args[index]
300 return options
301 options[option] = str(testcases)
302
303 return options
304
305 def cmdloop(self, intro=introduction):
306 print introduction
307 while True:
308 try:
309 super(CLI, self).cmdloop(intro="")
310 self.postloop()
311 except KeyboardInterrupt:
312 testthread.pause()
313
314 def do_echo( self, line ):
315 '''
316 Echoing of given input.
317 '''
318 output(line)
319
320 def do_sh( self, line ):
321 '''
322 Run an external shell command
323 sh pwd
324 sh ifconfig etc.
325 '''
326 call( line, shell=True )
327
328
329 def do_py( self, line ):
330 '''
331 Evaluate a Python expression.
332
333 py main.log.info("Sample Log Information")
334 2013-01-07 12:07:26,804 - PoxTest - INFO - Sample Log Information
335
336 '''
337 try:
338 exec( line )
339 except Exception, e:
340 output( str( e ) + '\n' )
341
342 def do_interpret(self,line):
343 '''
344 interpret will translate the single line openspeak statement to equivalent python script.
345
346 teston> interpret ASSERT result EQUALS main.TRUE ONPASS "Ping executed successfully" ONFAIL "Ping failed"
347 utilities.assert_equals(expect=main.TRUE,actual=result,onpass="Ping executed successfully",onfail="Ping failed")
348
349 '''
350 from core import openspeak
351 ospk = openspeak.OpenSpeak()
352 try :
353 translated_code = ospk.interpret(text=line)
354 print translated_code
355 except AttributeError, e:
356 print 'Dynamic params are not allowed in single statement translations'
357
358 def do_do (self,line):
359 '''
360 Do will translate and execute the openspeak statement for the paused test.
361 do <OpenSpeak statement>
362 '''
363 if testthread:
364 from core import openspeak
365 ospk = openspeak.OpenSpeak()
366 try :
367 translated_code = ospk.interpret(text=line)
368 eval(translated_code)
369 except (AttributeError,SyntaxError), e:
370 print 'Dynamic params are not allowed in single statement translations'
371 else :
372 print "Do will translate and execute the openspeak statement for the paused test.\nPlease use interpret to translate the OpenSpeak statement."
373
374 def do_compile(self,line):
375 '''
376 compile will translate the openspeak (.ospk) file into TestON test script (python).
377 It will receive the openspeak file path as input and will generate
378 equivalent test-script file in the same directory.
379
380 usage:
381 -----
382 teston>compile /home/openflow/TestON/PoxTest.ospk
383
384 Auto-generated test-script file is /home/openflow/TestON/PoxTest.py
385 '''
386 from core import openspeak
387 openspeak = openspeak.OpenSpeak()
388 openspeakfile = line
389 if os.path.exists(openspeakfile) :
390 openspeak.compiler(openspeakfile=openspeakfile,writetofile=1)
391 print "Auto-generated test-script file is "+ re.sub("ospk","py",openspeakfile,0)
392 else:
393 print 'There is no such file : '+line
394
395 def do_exit( self, _line ):
396 "Exit"
397 if testthread:
398 testthread.stop()
399
400 sys.exit()
401
402 return 'exited by user command'
403
404 def do_quit( self, line ):
405 "Exit"
406 return self.do_exit( line )
407
408 def do_EOF( self, line ):
409 "Exit"
410 output( '\n' )
411 return self.do_exit( line )
412
413 def isatty( self ):
414 "Is our standard input a tty?"
415 return isatty( self.stdin.fileno() )
416
417 def do_source( self, line ):
418 '''
419 Read shell commands from an input file and execute them sequentially.
420 cmdsource.txt :
421
422 "pwd
423 ls "
424
425 teston>source /home/openflow/cmdsource.txt
426 /home/openflow/TestON/bin/
427 cli.py __init__.py
428
429 '''
430
431 args = line.split()
432 if len(args) != 1:
433 error( 'usage: source <file>\n' )
434 return
435 try:
436 self.inputFile = open( args[ 0 ] )
437 while True:
438 line = self.inputFile.readline()
439 if len( line ) > 0:
440 call( line, shell=True )
441 else:
442 break
443 except IOError:
444 error( 'error reading file %s\n' % args[ 0 ] )
445
446 def do_updatedriver(self,line):
447 '''
448 updatedriver will update the given driver name which exists into mentioned config file.
449 It will receive two optional arguments :
450
451 1. Config File Path
452 2. Drivers List to be updated.
453
454 Default : config file = "~/TestON/config/updatedriver" ,
455 Driver List = all drivers specified in config file .
456 '''
457 args = line.split()
458 config = ''
459 drivers = ''
460 try :
461 for index, option in enumerate(args):
462 if option == 'config':
463 index = index + 1
464 config = args[index]
465 elif option == 'drivers' :
466 index = index + 1
467 drivers = args[index]
468 except IndexError:
469 pass
470 import updatedriver
471 converter = updatedriver.UpdateDriver()
472
473 if config == '':
474 path = re.sub("(bin)$", "", os.getcwd())
475 config = path + "/config/updatedriver.cfg"
476 configDict = converter.configparser(config)
477
478 else :
479 converter.configparser(config)
480 configDict = converter.configparser(config)
481
482
483 converter.writeDriver(drivers)
484
485
486
487
488 def do_time( self, line ):
489 "Measure time taken for any command in TestON."
490 start = time.time()
491 self.onecmd(line)
492 elapsed = time.time() - start
493 self.stdout.write("*** Elapsed time: %0.6f secs\n" % elapsed)
494
495 def default( self, line ):
496 """Called on an input line when the command prefix is not recognized."""
497 first, args, line = self.parseline( line )
498 if not args:
499 return
500 if args and len(args) > 0 and args[ -1 ] == '\n':
501 args = args[ :-1 ]
502 rest = args.split( ' ' )
503
504 error( '*** Unknown command: %s\n' % first )
505
506
507
508class TestThread(threading.Thread):
509 '''
510 TestThread class will handle the test execution and will communicate with the thread in the do_run.
511 '''
512 def __init__(self,options):
513 self._stopevent = threading.Event()
514 threading.Thread.__init__(self)
515 self.is_stop = False
516 self.options = options
517 __builtin__.testthread = self
518
519 def run(self):
520 '''
521 Will execute the test.
522 '''
523 while not self.is_stop :
524 if not self._stopevent.isSet():
525 self.test_on = TestON(self.options)
526 try :
527 if self.test_on.init_result:
528 result = self.test_on.run()
529 if not self.is_stop :
530 result = self.test_on.cleanup()
531 self.is_stop = True
532 except(KeyboardInterrupt):
533 print "Recevied Interrupt,cleaning-up the logs and drivers before exiting"
534 result = self.test_on.cleanup()
535 self.is_stop = True
536
537 __builtin__.testthread = False
538
539 def pause(self):
540 '''
541 Will pause the test.
542 '''
543 print "Will pause the test's execution, after completion of this step.....\n\n\n\n"
544 cli.pause = True
545 self._stopevent.set()
546
547 def play(self):
548 '''
549 Will resume the paused test.
550 '''
551 self._stopevent.clear()
552 cli.pause = False
553
554 def stop(self):
555 '''
556 Will stop the test execution.
557 '''
558
559 print "Stopping the test"
560 self.is_stop = True
561 cli.stop = True
562 __builtin__.testthread = False
563
564def output(msg):
565 '''
566 Simply, print the message in console
567 '''
568 print msg
569
570def error(msg):
571 '''
572 print the error message.
573 '''
574 print msg
575
576def dictToObj(dictionary):
577 '''
578 This will facilitates the converting of the dictionary to the object.
579 This method will help to send options as object format to the test.
580 '''
581 if isinstance(dictionary, list):
582 dictionary = [dictToObj(x) for x in dictionary]
583 if not isinstance(dictionary, dict):
584 return dictionary
585 class Convert(object):
586 pass
587 obj = Convert()
588 for k in dictionary:
589 obj.__dict__[k] = dictToObj(dictionary[k])
590 return obj
591
592
593if __name__ == '__main__':
594 if len(sys.argv) > 1:
595 CLI("test").onecmd(' '.join(sys.argv[1:]))
596 else:
597 CLI("test").cmdloop()