blob: 0bd3a0aae00b2dcca689a75e51a562ffae8311d7 [file] [log] [blame]
#!/usr/bin/env python
'''
Created on 11-Oct-2012
@authors: Anil Kumar (anilkumar.s@paxterrasolutions.com),
TestON is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TestON is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TestON. If not, see <http://www.gnu.org/licenses/>.
'''
import pexpect
import struct, fcntl, os, sys, signal
import re
from core import xmldict
class GenerateDriver():
'''
This will
'''
def __init__(self):
self.default = ''
self.prompt = '>'
self.LASTRSP =''
self.command_dictionary = {}
self.config_details = {}
self.last_sub_command = None
self.commnads_ordered_list = []
filePath = "generatedriver.cfg"
self.configFile = filePath
try :
xml = open(filePath).read()
self.config_details = xmldict.xml_to_dict(xml)
except :
print "Error : Config file " + self.configFile + " not defined properly or file path error"
sys.exit()
print self.config_details
self.device_name = ''
def connect(self,**connectargs):
'''
Connection will establish to the remote host using ssh.
It will take user_name ,ip_address and password as arguments<br>
and will return the handle.
'''
for key in connectargs:
vars(self)[key] = connectargs[key]
ssh_newkey = 'Are you sure you want to continue connecting'
refused = "ssh: connect to host "+self.ip_address+" port 22: Connection refused"
if self.port:
self.handle =pexpect.spawn('ssh -p '+self.port+' '+self.user_name+'@'+self.ip_address,maxread=50000)
else :
self.handle =pexpect.spawn('ssh '+self.user_name+'@'+self.ip_address,maxread=50000)
self.logfile_handler = open(os.getcwd()+"/GenerateDriver.log","w+")
self.handle.logfile = self.logfile_handler
i=self.handle.expect([ssh_newkey,'password:',pexpect.EOF,pexpect.TIMEOUT,refused],10)
if i==0:
self.handle.sendline('yes')
i=self.handle.expect([ssh_newkey,'password:',pexpect.EOF,pexpect.TIMEOUT])
return self.handle
if i==1:
self.handle.sendline(self.pwd)
self.handle.expect('>|#|$')
return self.handle
elif i==2:
print "ssh: connect to host "+self.ip_address+": Error"
return False
elif i==3: #timeout
print "ssh: connect to host "+self.ip_address+": Connection timed out"
return False
elif i==4:
print "ssh: connect to host "+self.ip_address+": Connection refused"
return False
self.handle.sendline("\r")
return self.handle
def execute(self, **execparams):
'''
This method will execute the command and will check for the expected prompt.
'''
self.LASTRSP = ''
defaultPrompt = '.*[\$>\#]'
for key in execparams:
vars(self)[key] = execparams[key]
self.handle.sendline(self.cmd)
timeoutVar = self.timeout if self.timeout else 10
index = self.handle.expect([self.prompt, "byte\s\d+", 'Command not found.', pexpect.TIMEOUT,"\n:",pexpect.EOF], timeout = timeoutVar)
if index == 0:
self.LASTRSP = self.LASTRSP + self.handle.before
#print "Expected Prompt Found"
elif index == 1:
self.LASTRSP = self.LASTRSP + self.handle.before
self.handle.send("\r")
print("Found More screen to go , Sending a key to proceed")
indexMore = self.handle.expect(["byte\s\d+", self.prompt], timeout = timeoutVar)
while indexMore == 0:
print "Found another More screen to go , Sending a key to proceed"
self.handle.send("\r")
indexMore = self.handle.expect(["byte\s\d+", self.prompt,pexpect.EOF,pexpect.TIMEOUT], timeout = timeoutVar)
self.LASTRSP = self.LASTRSP + self.handle.before
#print self.LASTRSP
elif index ==2:
print "Command not found"
self.LASTRSP = self.LASTRSP + self.handle.before
elif index ==3:
print "Expected Prompt not found , Time Out!!"
return False
elif index == 4:
self.LASTRSP = self.LASTRSP + self.handle.before
self.handle.sendcontrol("D")
#print "AA"*89
indexMore = self.handle.expect(["\n:", self.prompt,pexpect.EOF,pexpect.TIMEOUT], timeout = timeoutVar)
while indexMore == 0:
self.handle.sendcontrol("D")
indexMore = self.handle.expect(["\n:", self.prompt,".*",pexpect.EOF,pexpect.TIMEOUT], timeout = timeoutVar)
self.LASTRSP = self.LASTRSP + self.handle.before
return self.LASTRSP
def configure(self):
'''
Will start the Configure mode of the device.
'''
config_result = self.execute(cmd="configure",prompt='\#',timeout=10)
return config_result
def get_command_help(self,command):
'''
Will get the help of the Command
'''
self.handle.setecho(False)
help_keyword = self.config_details['device'][self.device_name]['help_keyword']
interrupt_key = self.config_details['device'][self.device_name]['interrupt_key']
command_details = self.execute(cmd=command+" "+help_keyword,prompt='\#',timeout=2)
#command_details = self.execute(cmd=command+" "+help_keyword,prompt='\#',timeout=2)
self.handle.sendcontrol(interrupt_key)
#print command_details
return command_details
def get_command_details(self,command):
'''
Will Update the command_dictionary with the available commands details
'''
temp_dictionary = {}
command_resulut = self.get_command_help(command)
try :
words = command_resulut.split("\n")
except AttributeError,e:
print e
return
lines = command_resulut.split("\n")
options_list = []
for line in lines :
value_match = re.search('[\s|\>|\+|\-|\<]{3}(\<(\w+))\s*',line)
if value_match:
print " Enter Value for "+value_match.group(2)
#self.handle.interact()
else:
match = re.search(r"\s\s[\w|-]+\s\s",line)
if match :
match_command = match.group(0)
print match_command
options_list.append(match_command)
temp_dictionary[command] = options_list
self.command_dictionary[command] = options_list
self.print_details(self.command_dictionary)
print "temp dir: --------"
print temp_dictionary
print "-------------"
return temp_dictionary
def print_details(self,command_dictionary):
'''
Will print the details in Tree Format
'''
self.commnads_ordered_list = command_dictionary.keys()
# Sorting the output based on the length of the command string
length = len(self.commnads_ordered_list ) - 1
sorted = False
while not sorted:
sorted = True
for i in range(length):
if len(self.commnads_ordered_list[i]) > len(self.commnads_ordered_list[i+1]):
sorted = False
self.commnads_ordered_list[i], self.commnads_ordered_list[i+1] = self.commnads_ordered_list[i+1], self.commnads_ordered_list[i]
for key in self.commnads_ordered_list:
print key +"\t "+str(command_dictionary[key])
print "\n\n"
def get_details_recursive(self,main_comand):
try :
self.last_sub_command = main_comand.split()[len(main_comand.split())-1]
except :
self.last_sub_command = ''
main_result_dcitionary = self.get_command_details(main_comand)
if main_result_dcitionary :
for key in main_result_dcitionary.keys():
for index, each_option in enumerate(main_result_dcitionary[key]) :
if re.search(self.config_details['device'][self.device_name]['end_pattern']+"|^\.|^\d",str(main_result_dcitionary[key][index])):
print "Reached the last argument for this "+main_comand+" "+str(each_option)+"\n"
main_result_dcitionary[key].remove(each_option)
return
elif self.last_sub_command == str(main_result_dcitionary[key][index]):
print "Same command repeating, So Exiting "+main_comand+" "+str(each_option)+"\n"
main_result_dcitionary[key].remove(each_option)
break
result_dcitionary = self.get_details_recursive(main_comand+" "+str(each_option))
return
def create_driver(self):
name = self.device_name
driver_file_data = 'class '+name +":\n"
driver_file_data = driver_file_data + " def __init__( self ):\n"
driver_file_data = driver_file_data + " self.prompt = '(.*)'\n self.timeout = 60 \n\n"
for index,command in enumerate(self.commnads_ordered_list) :
api_data = ' def '
command_as_api = re.sub(" ","_" , command, 0)
command_as_api = re.sub("\.|\-|\\|\/|\/","" , command_as_api, 0)
current_letter = 0
underscore_count = 0
command_temp = ""
for c in command_as_api:
current_letter = current_letter + 1
if c == "_":
underscore_count = underscore_count+1
else:
underscore_count = 0
if underscore_count > 1:
command_temp = command_temp + ""
else:
command_temp = command_temp + c
if command_temp[len(command_temp)-1] == "_":
command_temp = command_temp[0:len(command_temp)-1]
command_as_api = command_temp
#options = ''
#for option in self.command_dictionary[command]:
#options = options+',' + option
#options = re.sub("^\s*,|,$","" , options, 0)
api_data = api_data + command_as_api+"(self, *options, **def_args ):\n"
api_data = api_data + " '''Possible Options :"+str(self.command_dictionary[command])+"'''\n"
api_data = api_data + " arguments= ''\n"
api_data = api_data + " for option in options:\n"
api_data = api_data + " arguments = arguments + option +' ' \n"
api_data = api_data + " prompt = def_args.setdefault('prompt',self.prompt)\n"
api_data = api_data + " timeout = def_args.setdefault('timeout',self.timeout)\n"
api_data = api_data + " self.execute( cmd= \""+ command + " \"+ arguments, prompt = prompt, timeout = timeout ) \n"
api_data = api_data + " return main.TRUE\n"
driver_file_data = driver_file_data + api_data +"\n"
driver_file = open(os.getcwd()+"/"+name.lower()+".py", 'w')
driver_file.write(driver_file_data)
print driver_file_data
def disconnect(self):
result = True
return result
import pexpect
if __name__ == "__main__":
generate = GenerateDriver()
import sys
device_name = sys.argv[1]
generate.device_name = device_name
ip_address = generate.config_details['device'][device_name]['ip_address']
user_name = generate.config_details['device'][device_name]['user_name']
password = generate.config_details['device'][device_name]['password']
command = generate.config_details['device'][device_name]['command']
commandlist = re.sub("(\[|\])", "", command)
commandlist = list(eval(command+','))
connect_handle = generate.connect(user_name = user_name ,ip_address = ip_address, pwd = password , port = None)
if connect_handle :
# generate.configure()
for root_command in commandlist :
generate.get_details_recursive(root_command)
generate.create_driver()
generate.disconnect()
#generate.get_command_details(main_command)
else :
print "Connection Failed to the host"