Finished test Clean up
diff --git a/TestON/tests/OnosCHO/OnosCHO.py b/TestON/tests/OnosCHO/OnosCHO.py
index 228843e..3be5cbb 100644
--- a/TestON/tests/OnosCHO/OnosCHO.py
+++ b/TestON/tests/OnosCHO/OnosCHO.py
@@ -9,7 +9,7 @@
class OnosCHO:
- def __init__(self):
+ def __init__( self ):
self.default = ''
global deviceDPIDs
global hostMACs
@@ -19,8 +19,8 @@
global installedIntents
global randomLink1, randomLink2, randomLink3, numSwitches, numLinks
- def CASE1(self, main):
- '''
+ def CASE1( self, main ):
+ """
Startup sequence:
git pull
mvn clean install
@@ -29,168 +29,187 @@
onos-verify-cell
onos-install -f
onos-wait-for-start
- '''
+ """
import time
- cell_name = main.params['ENV']['cellName']
- git_pull = main.params['GIT']['autoPull']
- numCtrls = main.params['CTRL']['numCtrl']
- git_branch = main.params['GIT']['branch']
+ cell_name = main.params[ 'ENV' ][ 'cellName' ]
+ git_pull = main.params[ 'GIT' ][ 'autoPull' ]
+ numCtrls = main.params[ 'CTRL' ][ 'numCtrl' ]
+ git_branch = main.params[ 'GIT' ][ 'branch' ]
- main.case("Set up test environment")
- main.log.report("Set up test environment")
- main.log.report("_______________________")
-
- main.step("Git checkout and pull "+git_branch)
+ main.case( "Set up test environment" )
+ main.log.report( "Set up test environment" )
+ main.log.report( "_______________________" )
+
+ main.step( "Git checkout and pull " + git_branch )
if git_pull == 'on':
- checkout_result = main.ONOSbench.git_checkout(git_branch)
+ checkout_result = main.ONOSbench.git_checkout( git_branch )
pull_result = main.ONOSbench.git_pull()
- cp_result = (checkout_result and pull_result)
+ cp_result = ( checkout_result and pull_result )
else:
checkout_result = main.TRUE
pull_result = main.TRUE
- main.log.info("Skipped git checkout and pull")
- cp_result = (checkout_result and pull_result)
- utilities.assert_equals(expect=main.TRUE, actual=cp_result,
- onpass="Test step PASS",
- onfail="Test step FAIL")
-
- main.step("mvn clean & install")
+ main.log.info( "Skipped git checkout and pull" )
+ cp_result = ( checkout_result and pull_result )
+ utilities.assert_equals( expect=main.TRUE, actual=cp_result,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
+
+ main.step( "mvn clean & install" )
mvn_result = main.ONOSbench.clean_install()
- utilities.assert_equals(expect=main.TRUE, actual=mvn_result,
- onpass="Test step PASS",
- onfail="Test step FAIL")
+ utilities.assert_equals( expect=main.TRUE, actual=mvn_result,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
- main.ONOSbench.get_version(report=True)
+ main.ONOSbench.get_version( report=True )
- main.step("Apply Cell environment for ONOS")
- cell_result = main.ONOSbench.set_cell(cell_name)
- utilities.assert_equals(expect=main.TRUE, actual=cell_result,
- onpass="Test step PASS",
- onfail="Test step FAIL")
+ main.step( "Apply Cell environment for ONOS" )
+ cell_result = main.ONOSbench.set_cell( cell_name )
+ utilities.assert_equals( expect=main.TRUE, actual=cell_result,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
- main.step("Create ONOS package")
+ main.step( "Create ONOS package" )
packageResult = main.ONOSbench.onos_package()
- utilities.assert_equals(expect=main.TRUE, actual=packageResult,
- onpass="Test step PASS",
- onfail="Test step FAIL")
+ utilities.assert_equals( expect=main.TRUE, actual=packageResult,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
- main.step("Uninstall ONOS package on all Nodes")
- uninstallResult=main.TRUE
- for i in range(1,int(numCtrls)+1):
- ONOS_ip = main.params['CTRL']['ip'+str(i)]
- main.log.info("Unintsalling package on ONOS Node IP: "+ONOS_ip)
- u_result= main.ONOSbench.onos_uninstall(ONOS_ip)
- utilities.assert_equals(expect=main.TRUE, actual=u_result,
- onpass="Test step PASS",
- onfail="Test step FAIL")
- uninstallResult=(uninstallResult and u_result)
+ main.step( "Uninstall ONOS package on all Nodes" )
+ uninstallResult = main.TRUE
+ for i in range( 1, int( numCtrls ) + 1 ):
+ ONOS_ip = main.params[ 'CTRL' ][ 'ip' + str( i ) ]
+ main.log.info( "Unintsalling package on ONOS Node IP: " + ONOS_ip )
+ u_result = main.ONOSbench.onos_uninstall( ONOS_ip )
+ utilities.assert_equals( expect=main.TRUE, actual=u_result,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
+ uninstallResult = ( uninstallResult and u_result )
- main.step("Removing copy-cat logs from ONOS nodes")
+ main.step( "Removing copy-cat logs from ONOS nodes" )
main.ONOSbench.onos_remove_raft_logs()
- main.step("Install ONOS package on all Nodes")
- installResult=main.TRUE
- for i in range(1,int(numCtrls)+1):
- ONOS_ip = main.params['CTRL']['ip'+str(i)]
- main.log.info("Intsalling package on ONOS Node IP: "+ONOS_ip)
- i_result= main.ONOSbench.onos_install(node=ONOS_ip)
- utilities.assert_equals(expect=main.TRUE, actual=i_result,
- onpass="Test step PASS",
- onfail="Test step FAIL")
- installResult=(installResult and i_result)
+ main.step( "Install ONOS package on all Nodes" )
+ installResult = main.TRUE
+ for i in range( 1, int( numCtrls ) + 1 ):
+ ONOS_ip = main.params[ 'CTRL' ][ 'ip' + str( i ) ]
+ main.log.info( "Intsalling package on ONOS Node IP: " + ONOS_ip )
+ i_result = main.ONOSbench.onos_install( node=ONOS_ip )
+ utilities.assert_equals( expect=main.TRUE, actual=i_result,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
+ installResult = ( installResult and i_result )
- main.step("Verify ONOS nodes UP status")
- statusResult=main.TRUE
- for i in range(1,int(numCtrls)+1):
- ONOS_ip = main.params['CTRL']['ip'+str(i)]
- main.log.info("ONOS Node "+ONOS_ip+" status:")
- onos_status = main.ONOSbench.onos_status(node=ONOS_ip)
- utilities.assert_equals(expect=main.TRUE, actual=onos_status,
- onpass="Test step PASS",
- onfail="Test step FAIL")
- statusResult=(statusResult and onos_status)
+ main.step( "Verify ONOS nodes UP status" )
+ statusResult = main.TRUE
+ for i in range( 1, int( numCtrls ) + 1 ):
+ ONOS_ip = main.params[ 'CTRL' ][ 'ip' + str( i ) ]
+ main.log.info( "ONOS Node " + ONOS_ip + " status:" )
+ onos_status = main.ONOSbench.onos_status( node=ONOS_ip )
+ utilities.assert_equals( expect=main.TRUE, actual=onos_status,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
+ statusResult = ( statusResult and onos_status )
- main.step("Start ONOS CLI on all nodes")
+ main.step( "Start ONOS CLI on all nodes" )
cliResult = main.TRUE
karafTimeout = "3600000"
- time.sleep(15) # need to wait here for sometime. This will be removed once ONOS is stable enough
- for i in range(1,int(numCtrls)+1):
- ONOS_ip = main.params['CTRL']['ip'+str(i)]
- ONOScli = 'ONOScli'+str(i)
- main.log.info("ONOS Node "+ONOS_ip+" cli start:")
- exec "startcli=main."+ONOScli+".start_onos_cli(ONOS_ip, karafTimeout=karafTimeout)"
- utilities.assert_equals(expect=main.TRUE, actual=startcli,
- onpass="Test step PASS",
- onfail="Test step FAIL")
- cliResult = (cliResult and startcli)
+ # need to wait here for sometime. This will be removed once ONOS is
+ # stable enough
+ time.sleep( 15 )
+ for i in range( 1, int( numCtrls ) + 1 ):
+ ONOS_ip = main.params[ 'CTRL' ][ 'ip' + str( i ) ]
+ ONOScli = 'ONOScli' + str( i )
+ main.log.info( "ONOS Node " + ONOS_ip + " cli start:" )
+ exec "startcli=main." + ONOScli + \
+ ".start_onos_cli(ONOS_ip, karafTimeout=karafTimeout)"
+ utilities.assert_equals( expect=main.TRUE, actual=startcli,
+ onpass="Test step PASS",
+ onfail="Test step FAIL" )
+ cliResult = ( cliResult and startcli )
- case1Result = (cp_result and cell_result
- and packageResult and installResult and statusResult and cliResult)
- utilities.assert_equals(expect=main.TRUE, actual=case1Result,
- onpass="Set up test environment PASS",
- onfail="Set up test environment FAIL")
+ case1Result = ( cp_result and cell_result
+ and packageResult and installResult and statusResult and cliResult )
+ utilities.assert_equals( expect=main.TRUE, actual=case1Result,
+ onpass="Set up test environment PASS",
+ onfail="Set up test environment FAIL" )
- def CASE2(self, main):
- '''
+ def CASE2( self, main ):
+ """
This test script still needs more refactoring
- '''
+ """
import re
import time
import copy
- numCtrls = main.params['CTRL']['numCtrl']
- ONOS1_ip = main.params['CTRL']['ip1']
- ONOS2_ip = main.params['CTRL']['ip2']
- ONOS3_ip = main.params['CTRL']['ip3']
- ONOS4_ip = main.params['CTRL']['ip4']
- ONOS5_ip = main.params['CTRL']['ip5']
- ONOS1_port = main.params['CTRL']['port1']
- ONOS2_port = main.params['CTRL']['port2']
- ONOS3_port = main.params['CTRL']['port3']
- ONOS4_port = main.params['CTRL']['port4']
- ONOS5_port = main.params['CTRL']['port5']
+ numCtrls = main.params[ 'CTRL' ][ 'numCtrl' ]
+ ONOS1_ip = main.params[ 'CTRL' ][ 'ip1' ]
+ ONOS2_ip = main.params[ 'CTRL' ][ 'ip2' ]
+ ONOS3_ip = main.params[ 'CTRL' ][ 'ip3' ]
+ ONOS4_ip = main.params[ 'CTRL' ][ 'ip4' ]
+ ONOS5_ip = main.params[ 'CTRL' ][ 'ip5' ]
+ ONOS1_port = main.params[ 'CTRL' ][ 'port1' ]
+ ONOS2_port = main.params[ 'CTRL' ][ 'port2' ]
+ ONOS3_port = main.params[ 'CTRL' ][ 'port3' ]
+ ONOS4_port = main.params[ 'CTRL' ][ 'port4' ]
+ ONOS5_port = main.params[ 'CTRL' ][ 'port5' ]
- numCtrls = main.params['CTRL']['numCtrl']
- main.log.report("Assign and Balance all Mininet switches across controllers")
- main.log.report("_________________________________________________________")
- time.sleep(15) # need to wait here for sometime. This will be removed once ONOS is stable enough
- main.case("Assign and Balance all Mininet switches across controllers")
- main.step("Assign switches to controllers")
- for i in range(1,26): #1 to (num of switches +1)
- main.Mininet1.assign_sw_controller(sw=str(i),count=int(numCtrls),
- ip1=ONOS1_ip, port1=ONOS1_port,
- ip2=ONOS2_ip, port2=ONOS2_port,
- ip3=ONOS3_ip, port3=ONOS3_port, ip4=ONOS4_ip, port4=ONOS4_port,
- ip5=ONOS5_ip, port5=ONOS5_port)
+ numCtrls = main.params[ 'CTRL' ][ 'numCtrl' ]
+ main.log.report(
+ "Assign and Balance all Mininet switches across controllers" )
+ main.log.report(
+ "_________________________________________________________" )
+ # need to wait here for sometime. This will be removed once ONOS is
+ # stable enough
+ time.sleep( 15 )
+ main.case(
+ "Assign and Balance all Mininet switches across controllers" )
+ main.step( "Assign switches to controllers" )
+ for i in range( 1, 26 ): # 1 to ( num of switches +1 )
+ main.Mininet1.assign_sw_controller(
+ sw=str( i ),
+ count=int( numCtrls ),
+ ip1=ONOS1_ip,
+ port1=ONOS1_port,
+ ip2=ONOS2_ip,
+ port2=ONOS2_port,
+ ip3=ONOS3_ip,
+ port3=ONOS3_port,
+ ip4=ONOS4_ip,
+ port4=ONOS4_port,
+ ip5=ONOS5_ip,
+ port5=ONOS5_port )
switch_mastership = main.TRUE
- for i in range (1,26):
- response = main.Mininet1.get_sw_controller("s"+str(i))
- print("Response is " + str(response))
- if re.search("tcp:"+ONOS1_ip,response):
+ for i in range( 1, 26 ):
+ response = main.Mininet1.get_sw_controller( "s" + str( i ) )
+ print( "Response is " + str( response ) )
+ if re.search( "tcp:" + ONOS1_ip, response ):
switch_mastership = switch_mastership and main.TRUE
else:
switch_mastership = main.FALSE
if switch_mastership == main.TRUE:
- main.log.report("Controller assignment successfull")
+ main.log.report( "Controller assignment successfull" )
else:
- main.log.report("Controller assignment failed")
- time.sleep(5)
+ main.log.report( "Controller assignment failed" )
+ time.sleep( 5 )
- main.step("Balance devices across controllers")
- for i in range(int(numCtrls)):
+ main.step( "Balance devices across controllers" )
+ for i in range( int( numCtrls ) ):
balanceResult = main.ONOScli1.balance_masters()
- time.sleep(3) # giving some breathing time for ONOS to complete re-balance
+ # giving some breathing time for ONOS to complete re-balance
+ time.sleep( 3 )
- utilities.assert_equals(expect=main.TRUE, actual=balanceResult,
- onpass="Assign and Balance devices test PASS",
- onfail="Assign and Balance devices test FAIL")
+ utilities.assert_equals(
+ expect=main.TRUE,
+ actual=balanceResult,
+ onpass="Assign and Balance devices test PASS",
+ onfail="Assign and Balance devices test FAIL" )
- def CASE3(self,main) :
- '''
+ def CASE3( self, main ):
+ """
This Test case will be extended to collect and store more data related
ONOS state.
- '''
+ """
import re
import copy
deviceDPIDs = []
@@ -199,118 +218,136 @@
deviceActiveLinksCount = []
devicePortsEnabledCount = []
- main.log.report("Collect and Store topology details from ONOS before running any Tests")
- main.log.report("____________________________________________________________________")
- main.case ("Collect and Store Topology Deatils from ONOS")
+ main.log.report(
+ "Collect and Store topology details from ONOS before running any Tests" )
+ main.log.report(
+ "____________________________________________________________________" )
+ main.case( "Collect and Store Topology Deatils from ONOS" )
- main.step("Collect and store current number of switches and links")
+ main.step( "Collect and store current number of switches and links" )
topology_output = main.ONOScli1.topology()
- topology_result = main.ONOSbench.get_topology(topology_output)
- numSwitches = topology_result['devices']
- numLinks = topology_result['links']
- main.log.info("Currently there are %s switches and %s links" %(str(numSwitches), str(numLinks)))
+ topology_result = main.ONOSbench.get_topology( topology_output )
+ numSwitches = topology_result[ 'devices' ]
+ numLinks = topology_result[ 'links' ]
+ main.log.info(
+ "Currently there are %s switches and %s links" %
+ ( str( numSwitches ), str( numLinks ) ) )
- main.step("Store Device DPIDs")
- for i in range(1,26):
- deviceDPIDs.append("of:00000000000000"+format(i, '02x'))
- print "Device DPIDs in Store: \n", str(deviceDPIDs)
+ main.step( "Store Device DPIDs" )
+ for i in range( 1, 26 ):
+ deviceDPIDs.append( "of:00000000000000" + format( i, '02x' ) )
+ print "Device DPIDs in Store: \n", str( deviceDPIDs )
- main.step("Store Host MACs")
- for i in range(1,26):
- hostMACs.append("00:00:00:00:00:"+format(i, '02x')+"/-1")
- print "Host MACs in Store: \n", str(hostMACs)
+ main.step( "Store Host MACs" )
+ for i in range( 1, 26 ):
+ hostMACs.append( "00:00:00:00:00:" + format( i, '02x' ) + "/-1" )
+ print "Host MACs in Store: \n", str( hostMACs )
- main.step("Collect and store all Devices Links")
- linksResult = main.ONOScli1.links(json_format=False)
- ansi_escape = re.compile(r'\x1b[^m]*m')
- linksResult = ansi_escape.sub('', linksResult)
- linksResult = linksResult.replace(" links","").replace("\r\r","")
- linksResult=linksResult.splitlines()
- linksResult = linksResult[1:]
- deviceLinks = copy.copy(linksResult)
- print "Device Links Stored: \n", str(deviceLinks)
- print "Length of Links Store", len(deviceLinks) # this will be asserted to check with the params provided count of links
+ main.step( "Collect and store all Devices Links" )
+ linksResult = main.ONOScli1.links( json_format=False )
+ ansi_escape = re.compile( r'\x1b[^m]*m' )
+ linksResult = ansi_escape.sub( '', linksResult )
+ linksResult = linksResult.replace( " links", "" ).replace( "\r\r", "" )
+ linksResult = linksResult.splitlines()
+ linksResult = linksResult[ 1: ]
+ deviceLinks = copy.copy( linksResult )
+ print "Device Links Stored: \n", str( deviceLinks )
+ # this will be asserted to check with the params provided count of
+ # links
+ print "Length of Links Store", len( deviceLinks )
- main.step("Collect and store each Device ports enabled Count")
- for i in range(1,26):
- portResult = main.ONOScli1.getDevicePortsEnabledCount("of:00000000000000"+format(i, '02x'))
- portTemp = re.split(r'\t+', portResult)
- portCount = portTemp[1].replace("\r\r\n\x1b[32m","")
- devicePortsEnabledCount.append(portCount)
- print "Device Enabled Port Counts Stored: \n", str(devicePortsEnabledCount)
+ main.step( "Collect and store each Device ports enabled Count" )
+ for i in range( 1, 26 ):
+ portResult = main.ONOScli1.getDevicePortsEnabledCount(
+ "of:00000000000000" +
+ format(
+ i,
+ '02x' ) )
+ portTemp = re.split( r'\t+', portResult )
+ portCount = portTemp[ 1 ].replace( "\r\r\n\x1b[32m", "" )
+ devicePortsEnabledCount.append( portCount )
+ print "Device Enabled Port Counts Stored: \n", str( devicePortsEnabledCount )
- main.step("Collect and store each Device active links Count")
- for i in range(1,26):
- linkCountResult = main.ONOScli1.getDeviceLinksActiveCount("of:00000000000000"+format(i, '02x'))
- linkCountTemp = re.split(r'\t+', linkCountResult)
- linkCount = linkCountTemp[1].replace("\r\r\n\x1b[32m","")
- deviceActiveLinksCount.append(linkCount)
- print "Device Active Links Count Stored: \n", str(deviceActiveLinksCount)
+ main.step( "Collect and store each Device active links Count" )
+ for i in range( 1, 26 ):
+ linkCountResult = main.ONOScli1.getDeviceLinksActiveCount(
+ "of:00000000000000" +
+ format(
+ i,
+ '02x' ) )
+ linkCountTemp = re.split( r'\t+', linkCountResult )
+ linkCount = linkCountTemp[ 1 ].replace( "\r\r\n\x1b[32m", "" )
+ deviceActiveLinksCount.append( linkCount )
+ print "Device Active Links Count Stored: \n", str( deviceActiveLinksCount )
- caseResult = main.TRUE # just returning TRUE for now as this one just collects data
- utilities.assert_equals(expect=main.TRUE, actual=case1Result,
- onpass="Saving ONOS topology data test PASS",
- onfail="Saving ONOS topology data test FAIL")
+ # just returning TRUE for now as this one just collects data
+ caseResult = main.TRUE
+ utilities.assert_equals( expect=main.TRUE, actual=case1Result,
+ onpass="Saving ONOS topology data test PASS",
+ onfail="Saving ONOS topology data test FAIL" )
- def CASE4(self,main) :
- '''
- Enable onos-app-fwd, Verify Reactive forwarding through ping all and Disable it
- '''
+ def CASE4( self, main ):
+ """
+ Enable onos-app-fwd, Verify Reactive forwarding through ping all and Disable it
+ """
import re
import copy
import time
- numCtrls = main.params['CTRL']['numCtrl']
- main.log.report("Enable Reactive forwarding and Verify ping all")
- main.log.report("______________________________________________")
- main.case ("Enable Reactive forwarding and Verify ping all")
- main.step("Enable Reactive forwarding")
+ numCtrls = main.params[ 'CTRL' ][ 'numCtrl' ]
+ main.log.report( "Enable Reactive forwarding and Verify ping all" )
+ main.log.report( "______________________________________________" )
+ main.case( "Enable Reactive forwarding and Verify ping all" )
+ main.step( "Enable Reactive forwarding" )
installResult = main.TRUE
- for i in range(1,int(numCtrls)+1):
+ for i in range( 1, int( numCtrls ) + 1 ):
onosFeature = 'onos-app-fwd'
- ONOS_ip = main.params['CTRL']['ip'+str(i)]
- ONOScli = 'ONOScli'+str(i)
- main.log.info("Enabling Reactive mode on ONOS Node "+ONOS_ip)
- exec "inResult=main."+ONOScli+".feature_install(onosFeature)"
- time.sleep(3)
+ ONOS_ip = main.params[ 'CTRL' ][ 'ip' + str( i ) ]
+ ONOScli = 'ONOScli' + str( i )
+ main.log.info( "Enabling Reactive mode on ONOS Node " + ONOS_ip )
+ exec "inResult=main." + ONOScli + ".feature_install(onosFeature)"
+ time.sleep( 3 )
installResult = inResult and installResult
- time.sleep(5)
+ time.sleep( 5 )
- main.step("Verify Pingall")
+ main.step( "Verify Pingall" )
ping_result = main.FALSE
time1 = time.time()
ping_result = main.Mininet1.pingall()
time2 = time.time()
- timeDiff = round((time2-time1),2)
- main.log.report("Time taken for Ping All: "+str(timeDiff)+" seconds")
+ timeDiff = round( ( time2 - time1 ), 2 )
+ main.log.report(
+ "Time taken for Ping All: " +
+ str( timeDiff ) +
+ " seconds" )
if ping_result == main.TRUE:
- main.log.report("Pingall Test in Reactive mode successful")
+ main.log.report( "Pingall Test in Reactive mode successful" )
else:
- main.log.report("Pingall Test in Reactive mode failed")
+ main.log.report( "Pingall Test in Reactive mode failed" )
- main.step("Disable Reactive forwarding")
+ main.step( "Disable Reactive forwarding" )
uninstallResult = main.TRUE
- for i in range(1,int(numCtrls)+1):
+ for i in range( 1, int( numCtrls ) + 1 ):
onosFeature = 'onos-app-fwd'
- ONOS_ip = main.params['CTRL']['ip'+str(i)]
- ONOScli = 'ONOScli'+str(i)
- main.log.info("Disabling Reactive mode on ONOS Node "+ONOS_ip)
- exec "unResult=main."+ONOScli+".feature_uninstall(onosFeature)"
+ ONOS_ip = main.params[ 'CTRL' ][ 'ip' + str( i ) ]
+ ONOScli = 'ONOScli' + str( i )
+ main.log.info( "Disabling Reactive mode on ONOS Node " + ONOS_ip )
+ exec "unResult=main." + ONOScli + ".feature_uninstall(onosFeature)"
uninstallResult = unResult and uninstallResult
- #Waiting for reative flows to be cleared.
- time.sleep(10)
+ # Waiting for reative flows to be cleared.
+ time.sleep( 10 )
case3Result = installResult and ping_result and uninstallResult
- utilities.assert_equals(expect=main.TRUE, actual=case3Result,
- onpass="Reactive Mode Pingall test PASS",
- onfail="Reactive Mode Pingall test FAIL")
+ utilities.assert_equals( expect=main.TRUE, actual=case3Result,
+ onpass="Reactive Mode Pingall test PASS",
+ onfail="Reactive Mode Pingall test FAIL" )
- def CASE5(self,main) :
- '''
+ def CASE5( self, main ):
+ """
Compare current ONOS topology with reference data
- '''
+ """
import re
devicesDPID_tmp = []
hostMACs_tmp = []
@@ -318,378 +355,529 @@
deviceActiveLinksCount_tmp = []
devicePortsEnabledCount_tmp = []
- main.log.report("Compare ONOS topology with reference data in Stores")
- main.log.report("__________________________________________________")
- main.case ("Compare ONOS topology with reference data")
-
- main.step("Compare current Device ports enabled with reference")
- for i in range(1,26):
- portResult = main.ONOScli1.getDevicePortsEnabledCount("of:00000000000000"+format(i, '02x'))
- portTemp = re.split(r'\t+', portResult)
- portCount = portTemp[1].replace("\r\r\n\x1b[32m","")
- devicePortsEnabledCount_tmp.append(portCount)
- time.sleep(2)
- print ("Device Enabled ports EXPECTED: \n"+ str(devicePortsEnabledCount))
- print ("Device Enabled ports ACTUAL: \n"+ str(devicePortsEnabledCount_tmp))
- if (cmp(devicePortsEnabledCount,devicePortsEnabledCount_tmp)==0):
+ main.log.report(
+ "Compare ONOS topology with reference data in Stores" )
+ main.log.report( "__________________________________________________" )
+ main.case( "Compare ONOS topology with reference data" )
+
+ main.step( "Compare current Device ports enabled with reference" )
+ for i in range( 1, 26 ):
+ portResult = main.ONOScli1.getDevicePortsEnabledCount(
+ "of:00000000000000" +
+ format(
+ i,
+ '02x' ) )
+ portTemp = re.split( r'\t+', portResult )
+ portCount = portTemp[ 1 ].replace( "\r\r\n\x1b[32m", "" )
+ devicePortsEnabledCount_tmp.append( portCount )
+ time.sleep( 2 )
+ print (
+ "Device Enabled ports EXPECTED: \n" +
+ str( devicePortsEnabledCount ) )
+ print (
+ "Device Enabled ports ACTUAL: \n" +
+ str( devicePortsEnabledCount_tmp ) )
+ if ( cmp( devicePortsEnabledCount,
+ devicePortsEnabledCount_tmp ) == 0 ):
stepResult1 = main.TRUE
else:
stepResult1 = main.FALSE
- main.step("Compare Device active links with reference")
- for i in range(1,26):
- linkResult = main.ONOScli1.getDeviceLinksActiveCount("of:00000000000000"+format(i, '02x'))
- linkTemp = re.split(r'\t+', linkResult)
- linkCount = linkTemp[1].replace("\r\r\n\x1b[32m","")
- deviceActiveLinksCount_tmp.append(linkCount)
- time.sleep(3)
- print ("Device Active links EXPECTED: \n"+str(deviceActiveLinksCount))
- print ("Device Active links ACTUAL: \n"+str(deviceActiveLinksCount_tmp))
- if (cmp(deviceActiveLinksCount,deviceActiveLinksCount_tmp)==0):
+ main.step( "Compare Device active links with reference" )
+ for i in range( 1, 26 ):
+ linkResult = main.ONOScli1.getDeviceLinksActiveCount(
+ "of:00000000000000" +
+ format(
+ i,
+ '02x' ) )
+ linkTemp = re.split( r'\t+', linkResult )
+ linkCount = linkTemp[ 1 ].replace( "\r\r\n\x1b[32m", "" )
+ deviceActiveLinksCount_tmp.append( linkCount )
+ time.sleep( 3 )
+ print (
+ "Device Active links EXPECTED: \n" +
+ str( deviceActiveLinksCount ) )
+ print (
+ "Device Active links ACTUAL: \n" +
+ str( deviceActiveLinksCount_tmp ) )
+ if ( cmp( deviceActiveLinksCount, deviceActiveLinksCount_tmp ) == 0 ):
stepResult2 = main.TRUE
else:
stepResult2 = main.FALSE
- '''
- place holder for comparing devices, hosts and paths if required.
+ """
+ place holder for comparing devices, hosts and paths if required.
Links and ports data would be incorrect with out devices anyways.
- '''
-
- caseResult=(stepResult1 and stepResult2)
- utilities.assert_equals(expect=main.TRUE, actual=case1Result,
- onpass="Compare Topology test PASS",
- onfail="Compare Topology test FAIL")
+ """
+ caseResult = ( stepResult1 and stepResult2 )
+ utilities.assert_equals( expect=main.TRUE, actual=case1Result,
+ onpass="Compare Topology test PASS",
+ onfail="Compare Topology test FAIL" )
if caseResult == main.TRUE:
- main.log.report("Compare Topology test Pass")
+ main.log.report( "Compare Topology test Pass" )
- def CASE6(self):
- '''
+ def CASE6( self ):
+ """
Install 300 host intents and verify ping all
- '''
- main.log.report("Add 300 host intents and verify pingall")
- main.log.report("_______________________________________")
+ """
+ main.log.report( "Add 300 host intents and verify pingall" )
+ main.log.report( "_______________________________________" )
import itertools
- main.case("Install 300 host intents")
- main.step("Add host Intents")
- intentResult=main.TRUE
- hostCombos = list(itertools.combinations(hostMACs, 2))
- for i in range(len(hostCombos)):
- iResult = main.ONOScli1.add_host_intent(hostCombos[i][0],hostCombos[i][1])
- intentResult=(intentResult and iResult)
+ main.case( "Install 300 host intents" )
+ main.step( "Add host Intents" )
+ intentResult = main.TRUE
+ hostCombos = list( itertools.combinations( hostMACs, 2 ) )
+ for i in range( len( hostCombos ) ):
+ iResult = main.ONOScli1.add_host_intent(
+ hostCombos[ i ][ 0 ],
+ hostCombos[ i ][ 1 ] )
+ intentResult = ( intentResult and iResult )
- main.step("Verify Ping across all hosts")
+ main.step( "Verify Ping across all hosts" )
pingResult = main.FALSE
time1 = time.time()
pingResult = main.Mininet1.pingall()
time2 = time.time()
- timeDiff = round((time2-time1),2)
- main.log.report("Time taken for Ping All: "+str(timeDiff)+" seconds")
- utilities.assert_equals(expect=main.TRUE, actual=pingResult,
- onpass="PING ALL PASS",
- onfail="PING ALL FAIL")
+ timeDiff = round( ( time2 - time1 ), 2 )
+ main.log.report(
+ "Time taken for Ping All: " +
+ str( timeDiff ) +
+ " seconds" )
+ utilities.assert_equals( expect=main.TRUE, actual=pingResult,
+ onpass="PING ALL PASS",
+ onfail="PING ALL FAIL" )
- case4Result=(intentResult and pingResult)
- utilities.assert_equals(expect=main.TRUE, actual=case4Result,
- onpass="Install 300 Host Intents and Ping All test PASS",
- onfail="Install 300 Host Intents and Ping All test FAIL")
+ case4Result = ( intentResult and pingResult )
+ utilities.assert_equals(
+ expect=main.TRUE,
+ actual=case4Result,
+ onpass="Install 300 Host Intents and Ping All test PASS",
+ onfail="Install 300 Host Intents and Ping All test FAIL" )
- def CASE70(self,main):
- '''
- Randomly bring some core links down and verify ping all (Host Intents Scenario)
- '''
+ def CASE70( self, main ):
+ """
+ Randomly bring some core links down and verify ping all ( Host Intents Scenario )
+ """
import random
- ONOS1_ip = main.params['CTRL']['ip1']
- link1End1 = main.params['CORELINKS']['linkS3a']
- link1End2 = main.params['CORELINKS']['linkS3b'].split(',')
- link2End1 = main.params['CORELINKS']['linkS14a']
- link2End2 = main.params['CORELINKS']['linkS14b'].split(',')
- link3End1 = main.params['CORELINKS']['linkS18a']
- link3End2 = main.params['CORELINKS']['linkS18b'].split(',')
- switchLinksToToggle = main.params['CORELINKS']['toggleLinks']
- link_sleep = int(main.params['timers']['LinkDiscovery'])
+ ONOS1_ip = main.params[ 'CTRL' ][ 'ip1' ]
+ link1End1 = main.params[ 'CORELINKS' ][ 'linkS3a' ]
+ link1End2 = main.params[ 'CORELINKS' ][ 'linkS3b' ].split( ',' )
+ link2End1 = main.params[ 'CORELINKS' ][ 'linkS14a' ]
+ link2End2 = main.params[ 'CORELINKS' ][ 'linkS14b' ].split( ',' )
+ link3End1 = main.params[ 'CORELINKS' ][ 'linkS18a' ]
+ link3End2 = main.params[ 'CORELINKS' ][ 'linkS18b' ].split( ',' )
+ switchLinksToToggle = main.params[ 'CORELINKS' ][ 'toggleLinks' ]
+ link_sleep = int( main.params[ 'timers' ][ 'LinkDiscovery' ] )
- main.log.report("Host intents - Randomly bring some core links down and verify ping all")
- main.log.report("_________________________________________________________________")
- main.case("Host intents - Randomly bring some core links down and verify ping all")
- main.step("Verify number of Switch links to toggle on each Core Switch are between 1 - 5")
- if (int(switchLinksToToggle) == 0 or int(switchLinksToToggle) > 5):
- main.log.info("Please check you PARAMS file. Valid range for number of switch links to toggle is between 1 to 5")
+ main.log.report(
+ "Host intents - Randomly bring some core links down and verify ping all" )
+ main.log.report(
+ "_________________________________________________________________" )
+ main.case(
+ "Host intents - Randomly bring some core links down and verify ping all" )
+ main.step(
+ "Verify number of Switch links to toggle on each Core Switch are between 1 - 5" )
+ if ( int( switchLinksToToggle ) ==
+ 0 or int( switchLinksToToggle ) > 5 ):
+ main.log.info(
+ "Please check you PARAMS file. Valid range for number of switch links to toggle is between 1 to 5" )
main.cleanup()
main.exit()
else:
- main.log.info("User provided Core switch links range to toggle is correct, proceeding to run the test")
+ main.log.info(
+ "User provided Core switch links range to toggle is correct, proceeding to run the test" )
- main.step("Cut links on Core devices using user provided range")
- randomLink1 = random.sample(link1End2,int(switchLinksToToggle))
- randomLink2 = random.sample(link2End2,int(switchLinksToToggle))
- randomLink3 = random.sample(link3End2,int(switchLinksToToggle))
- for i in range(int(switchLinksToToggle)):
- main.Mininet1.link(END1=link1End1,END2=randomLink1[i],OPTION="down")
- main.Mininet1.link(END1=link2End1,END2=randomLink2[i],OPTION="down")
- main.Mininet1.link(END1=link3End1,END2=randomLink3[i],OPTION="down")
- time.sleep(link_sleep)
+ main.step( "Cut links on Core devices using user provided range" )
+ randomLink1 = random.sample( link1End2, int( switchLinksToToggle ) )
+ randomLink2 = random.sample( link2End2, int( switchLinksToToggle ) )
+ randomLink3 = random.sample( link3End2, int( switchLinksToToggle ) )
+ for i in range( int( switchLinksToToggle ) ):
+ main.Mininet1.link(
+ END1=link1End1,
+ END2=randomLink1[ i ],
+ OPTION="down" )
+ main.Mininet1.link(
+ END1=link2End1,
+ END2=randomLink2[ i ],
+ OPTION="down" )
+ main.Mininet1.link(
+ END1=link3End1,
+ END2=randomLink3[ i ],
+ OPTION="down" )
+ time.sleep( link_sleep )
topology_output = main.ONOScli2.topology()
- linkDown = main.ONOSbench.check_status(topology_output,numSwitches,str(int(numLinks)-int(switchLinksToToggle)*6))
- utilities.assert_equals(expect=main.TRUE,actual=linkDown,
- onpass="Link Down discovered properly",
- onfail="Link down was not discovered in "+ str(link_sleep) + " seconds")
+ linkDown = main.ONOSbench.check_status(
+ topology_output, numSwitches, str(
+ int( numLinks ) - int( switchLinksToToggle ) * 6 ) )
+ utilities.assert_equals(
+ expect=main.TRUE,
+ actual=linkDown,
+ onpass="Link Down discovered properly",
+ onfail="Link down was not discovered in " +
+ str( link_sleep ) +
+ " seconds" )
- main.step("Verify Ping across all hosts")
+ main.step( "Verify Ping across all hosts" )
pingResultLinkDown = main.FALSE
time1 = time.time()
pingResultLinkDown = main.Mininet1.pingall()
time2 = time.time()
- timeDiff = round((time2-time1),2)
- main.log.report("Time taken for Ping All: "+str(timeDiff)+" seconds")
- utilities.assert_equals(expect=main.TRUE, actual=pingResultLinkDown,
- onpass="PING ALL PASS",
- onfail="PING ALL FAIL")
+ timeDiff = round( ( time2 - time1 ), 2 )
+ main.log.report(
+ "Time taken for Ping All: " +
+ str( timeDiff ) +
+ " seconds" )
+ utilities.assert_equals( expect=main.TRUE, actual=pingResultLinkDown,
+ onpass="PING ALL PASS",
+ onfail="PING ALL FAIL" )
caseResult7 = linkDown and pingResultLinkDown
- utilities.assert_equals(expect=main.TRUE, actual=caseResult7,
- onpass="Random Link cut Test PASS",
- onfail="Random Link cut Test FAIL")
+ utilities.assert_equals( expect=main.TRUE, actual=caseResult7,
+ onpass="Random Link cut Test PASS",
+ onfail="Random Link cut Test FAIL" )
- def CASE80(self,main):
- '''
- Bring the core links up that are down and verify ping all (Host Intents Scenario)
- '''
+ def CASE80( self, main ):
+ """
+ Bring the core links up that are down and verify ping all ( Host Intents Scenario )
+ """
import random
- ONOS1_ip = main.params['CTRL']['ip1']
- link1End1 = main.params['CORELINKS']['linkS3a']
- link2End1 = main.params['CORELINKS']['linkS14a']
- link3End1 = main.params['CORELINKS']['linkS18a']
- link_sleep = int(main.params['timers']['LinkDiscovery'])
- switchLinksToToggle = main.params['CORELINKS']['toggleLinks']
+ ONOS1_ip = main.params[ 'CTRL' ][ 'ip1' ]
+ link1End1 = main.params[ 'CORELINKS' ][ 'linkS3a' ]
+ link2End1 = main.params[ 'CORELINKS' ][ 'linkS14a' ]
+ link3End1 = main.params[ 'CORELINKS' ][ 'linkS18a' ]
+ link_sleep = int( main.params[ 'timers' ][ 'LinkDiscovery' ] )
+ switchLinksToToggle = main.params[ 'CORELINKS' ][ 'toggleLinks' ]
- main.log.report("Host intents - Bring the core links up that are down and verify ping all")
- main.log.report("__________________________________________________________________")
- main.case("Host intents - Bring the core links up that are down and verify ping all")
- main.step("Bring randomly cut links on Core devices up")
- for i in range(int(switchLinksToToggle)):
- main.Mininet1.link(END1=link1End1,END2=randomLink1[i],OPTION="up")
- main.Mininet1.link(END1=link2End1,END2=randomLink2[i],OPTION="up")
- main.Mininet1.link(END1=link3End1,END2=randomLink3[i],OPTION="up")
- time.sleep(link_sleep)
+ main.log.report(
+ "Host intents - Bring the core links up that are down and verify ping all" )
+ main.log.report(
+ "__________________________________________________________________" )
+ main.case(
+ "Host intents - Bring the core links up that are down and verify ping all" )
+ main.step( "Bring randomly cut links on Core devices up" )
+ for i in range( int( switchLinksToToggle ) ):
+ main.Mininet1.link(
+ END1=link1End1,
+ END2=randomLink1[ i ],
+ OPTION="up" )
+ main.Mininet1.link(
+ END1=link2End1,
+ END2=randomLink2[ i ],
+ OPTION="up" )
+ main.Mininet1.link(
+ END1=link3End1,
+ END2=randomLink3[ i ],
+ OPTION="up" )
+ time.sleep( link_sleep )
topology_output = main.ONOScli2.topology()
- linkUp = main.ONOSbench.check_status(topology_output,numSwitches,str(numLinks))
- utilities.assert_equals(expect=main.TRUE,actual=linkUp,
- onpass="Link up discovered properly",
- onfail="Link up was not discovered in "+ str(link_sleep) + " seconds")
+ linkUp = main.ONOSbench.check_status(
+ topology_output,
+ numSwitches,
+ str( numLinks ) )
+ utilities.assert_equals(
+ expect=main.TRUE,
+ actual=linkUp,
+ onpass="Link up discovered properly",
+ onfail="Link up was not discovered in " +
+ str( link_sleep ) +
+ " seconds" )
- main.step("Verify Ping across all hosts")
+ main.step( "Verify Ping across all hosts" )
pingResultLinkUp = main.FALSE
time1 = time.time()
pingResultLinkUp = main.Mininet1.pingall()
time2 = time.time()
- timeDiff = round((time2-time1),2)
- main.log.report("Time taken for Ping All: "+str(timeDiff)+" seconds")
- utilities.assert_equals(expect=main.TRUE, actual=pingResultLinkUp,
- onpass="PING ALL PASS",
- onfail="PING ALL FAIL")
+ timeDiff = round( ( time2 - time1 ), 2 )
+ main.log.report(
+ "Time taken for Ping All: " +
+ str( timeDiff ) +
+ " seconds" )
+ utilities.assert_equals( expect=main.TRUE, actual=pingResultLinkUp,
+ onpass="PING ALL PASS",
+ onfail="PING ALL FAIL" )
caseResult8 = linkUp and pingResultLinkUp
- utilities.assert_equals(expect=main.TRUE, actual=caseResult8,
- onpass="Link Up Test PASS",
- onfail="Link Up Test FAIL")
+ utilities.assert_equals( expect=main.TRUE, actual=caseResult8,
+ onpass="Link Up Test PASS",
+ onfail="Link Up Test FAIL" )
- def CASE71(self,main):
- '''
- Randomly bring some core links down and verify ping all (Point Intents Scenario)
- '''
+ def CASE71( self, main ):
+ """
+ Randomly bring some core links down and verify ping all ( Point Intents Scenario )
+ """
import random
- ONOS1_ip = main.params['CTRL']['ip1']
- link1End1 = main.params['CORELINKS']['linkS3a']
- link1End2 = main.params['CORELINKS']['linkS3b'].split(',')
- link2End1 = main.params['CORELINKS']['linkS14a']
- link2End2 = main.params['CORELINKS']['linkS14b'].split(',')
- link3End1 = main.params['CORELINKS']['linkS18a']
- link3End2 = main.params['CORELINKS']['linkS18b'].split(',')
- switchLinksToToggle = main.params['CORELINKS']['toggleLinks']
- link_sleep = int(main.params['timers']['LinkDiscovery'])
+ ONOS1_ip = main.params[ 'CTRL' ][ 'ip1' ]
+ link1End1 = main.params[ 'CORELINKS' ][ 'linkS3a' ]
+ link1End2 = main.params[ 'CORELINKS' ][ 'linkS3b' ].split( ',' )
+ link2End1 = main.params[ 'CORELINKS' ][ 'linkS14a' ]
+ link2End2 = main.params[ 'CORELINKS' ][ 'linkS14b' ].split( ',' )
+ link3End1 = main.params[ 'CORELINKS' ][ 'linkS18a' ]
+ link3End2 = main.params[ 'CORELINKS' ][ 'linkS18b' ].split( ',' )
+ switchLinksToToggle = main.params[ 'CORELINKS' ][ 'toggleLinks' ]
+ link_sleep = int( main.params[ 'timers' ][ 'LinkDiscovery' ] )
- main.log.report("Point Intents - Randomly bring some core links down and verify ping all")
- main.log.report("__________________________________________________________________")
- main.case("Point Intents - Randomly bring some core links down and verify ping all")
- main.step("Verify number of Switch links to toggle on each Core Switch are between 1 - 5")
- if (int(switchLinksToToggle) == 0 or int(switchLinksToToggle) > 5):
- main.log.info("Please check you PARAMS file. Valid range for number of switch links to toggle is between 1 to 5")
+ main.log.report(
+ "Point Intents - Randomly bring some core links down and verify ping all" )
+ main.log.report(
+ "__________________________________________________________________" )
+ main.case(
+ "Point Intents - Randomly bring some core links down and verify ping all" )
+ main.step(
+ "Verify number of Switch links to toggle on each Core Switch are between 1 - 5" )
+ if ( int( switchLinksToToggle ) ==
+ 0 or int( switchLinksToToggle ) > 5 ):
+ main.log.info(
+ "Please check you PARAMS file. Valid range for number of switch links to toggle is between 1 to 5" )
main.cleanup()
main.exit()
else:
- main.log.info("User provided Core switch links range to toggle is correct, proceeding to run the test")
+ main.log.info(
+ "User provided Core switch links range to toggle is correct, proceeding to run the test" )
- main.step("Cut links on Core devices using user provided range")
- randomLink1 = random.sample(link1End2,int(switchLinksToToggle))
- randomLink2 = random.sample(link2End2,int(switchLinksToToggle))
- randomLink3 = random.sample(link3End2,int(switchLinksToToggle))
- for i in range(int(switchLinksToToggle)):
- main.Mininet1.link(END1=link1End1,END2=randomLink1[i],OPTION="down")
- main.Mininet1.link(END1=link2End1,END2=randomLink2[i],OPTION="down")
- main.Mininet1.link(END1=link3End1,END2=randomLink3[i],OPTION="down")
- time.sleep(link_sleep)
+ main.step( "Cut links on Core devices using user provided range" )
+ randomLink1 = random.sample( link1End2, int( switchLinksToToggle ) )
+ randomLink2 = random.sample( link2End2, int( switchLinksToToggle ) )
+ randomLink3 = random.sample( link3End2, int( switchLinksToToggle ) )
+ for i in range( int( switchLinksToToggle ) ):
+ main.Mininet1.link(
+ END1=link1End1,
+ END2=randomLink1[ i ],
+ OPTION="down" )
+ main.Mininet1.link(
+ END1=link2End1,
+ END2=randomLink2[ i ],
+ OPTION="down" )
+ main.Mininet1.link(
+ END1=link3End1,
+ END2=randomLink3[ i ],
+ OPTION="down" )
+ time.sleep( link_sleep )
topology_output = main.ONOScli2.topology()
- linkDown = main.ONOSbench.check_status(topology_output,numSwitches,str(int(numLinks)-int(switchLinksToToggle)*6))
- utilities.assert_equals(expect=main.TRUE,actual=linkDown,
- onpass="Link Down discovered properly",
- onfail="Link down was not discovered in "+ str(link_sleep) + " seconds")
+ linkDown = main.ONOSbench.check_status(
+ topology_output, numSwitches, str(
+ int( numLinks ) - int( switchLinksToToggle ) * 6 ) )
+ utilities.assert_equals(
+ expect=main.TRUE,
+ actual=linkDown,
+ onpass="Link Down discovered properly",
+ onfail="Link down was not discovered in " +
+ str( link_sleep ) +
+ " seconds" )
- main.step("Verify Ping across all hosts")
+ main.step( "Verify Ping across all hosts" )
pingResultLinkDown = main.FALSE
time1 = time.time()
pingResultLinkDown = main.Mininet1.pingall()
time2 = time.time()
- timeDiff = round((time2-time1),2)
- main.log.report("Time taken for Ping All: "+str(timeDiff)+" seconds")
- utilities.assert_equals(expect=main.TRUE, actual=pingResultLinkDown,
- onpass="PING ALL PASS",
- onfail="PING ALL FAIL")
+ timeDiff = round( ( time2 - time1 ), 2 )
+ main.log.report(
+ "Time taken for Ping All: " +
+ str( timeDiff ) +
+ " seconds" )
+ utilities.assert_equals( expect=main.TRUE, actual=pingResultLinkDown,
+ onpass="PING ALL PASS",
+ onfail="PING ALL FAIL" )
caseResult7 = linkDown and pingResultLinkDown
- utilities.assert_equals(expect=main.TRUE, actual=caseResult7,
- onpass="Random Link cut Test PASS",
- onfail="Random Link cut Test FAIL")
+ utilities.assert_equals( expect=main.TRUE, actual=caseResult7,
+ onpass="Random Link cut Test PASS",
+ onfail="Random Link cut Test FAIL" )
- def CASE81(self,main):
- '''
- Bring the core links up that are down and verify ping all (Point Intents Scenario)
- '''
+ def CASE81( self, main ):
+ """
+ Bring the core links up that are down and verify ping all ( Point Intents Scenario )
+ """
import random
- ONOS1_ip = main.params['CTRL']['ip1']
- link1End1 = main.params['CORELINKS']['linkS3a']
- link2End1 = main.params['CORELINKS']['linkS14a']
- link3End1 = main.params['CORELINKS']['linkS18a']
- link_sleep = int(main.params['timers']['LinkDiscovery'])
- switchLinksToToggle = main.params['CORELINKS']['toggleLinks']
+ ONOS1_ip = main.params[ 'CTRL' ][ 'ip1' ]
+ link1End1 = main.params[ 'CORELINKS' ][ 'linkS3a' ]
+ link2End1 = main.params[ 'CORELINKS' ][ 'linkS14a' ]
+ link3End1 = main.params[ 'CORELINKS' ][ 'linkS18a' ]
+ link_sleep = int( main.params[ 'timers' ][ 'LinkDiscovery' ] )
+ switchLinksToToggle = main.params[ 'CORELINKS' ][ 'toggleLinks' ]
- main.log.report("Point intents - Bring the core links up that are down and verify ping all")
- main.log.report("___________________________________________________________________")
- main.case("Point intents - Bring the core links up that are down and verify ping all")
- main.step("Bring randomly cut links on Core devices up")
- for i in range(int(switchLinksToToggle)):
- main.Mininet1.link(END1=link1End1,END2=randomLink1[i],OPTION="up")
- main.Mininet1.link(END1=link2End1,END2=randomLink2[i],OPTION="up")
- main.Mininet1.link(END1=link3End1,END2=randomLink3[i],OPTION="up")
- time.sleep(link_sleep)
+ main.log.report(
+ "Point intents - Bring the core links up that are down and verify ping all" )
+ main.log.report(
+ "___________________________________________________________________" )
+ main.case(
+ "Point intents - Bring the core links up that are down and verify ping all" )
+ main.step( "Bring randomly cut links on Core devices up" )
+ for i in range( int( switchLinksToToggle ) ):
+ main.Mininet1.link(
+ END1=link1End1,
+ END2=randomLink1[ i ],
+ OPTION="up" )
+ main.Mininet1.link(
+ END1=link2End1,
+ END2=randomLink2[ i ],
+ OPTION="up" )
+ main.Mininet1.link(
+ END1=link3End1,
+ END2=randomLink3[ i ],
+ OPTION="up" )
+ time.sleep( link_sleep )
topology_output = main.ONOScli2.topology()
- linkUp = main.ONOSbench.check_status(topology_output,numSwitches,str(numLinks))
- utilities.assert_equals(expect=main.TRUE,actual=linkUp,
- onpass="Link up discovered properly",
- onfail="Link up was not discovered in "+ str(link_sleep) + " seconds")
+ linkUp = main.ONOSbench.check_status(
+ topology_output,
+ numSwitches,
+ str( numLinks ) )
+ utilities.assert_equals(
+ expect=main.TRUE,
+ actual=linkUp,
+ onpass="Link up discovered properly",
+ onfail="Link up was not discovered in " +
+ str( link_sleep ) +
+ " seconds" )
- main.step("Verify Ping across all hosts")
+ main.step( "Verify Ping across all hosts" )
pingResultLinkUp = main.FALSE
time1 = time.time()
pingResultLinkUp = main.Mininet1.pingall()
time2 = time.time()
- timeDiff = round((time2-time1),2)
- main.log.report("Time taken for Ping All: "+str(timeDiff)+" seconds")
- utilities.assert_equals(expect=main.TRUE, actual=pingResultLinkUp,
- onpass="PING ALL PASS",
- onfail="PING ALL FAIL")
+ timeDiff = round( ( time2 - time1 ), 2 )
+ main.log.report(
+ "Time taken for Ping All: " +
+ str( timeDiff ) +
+ " seconds" )
+ utilities.assert_equals( expect=main.TRUE, actual=pingResultLinkUp,
+ onpass="PING ALL PASS",
+ onfail="PING ALL FAIL" )
caseResult8 = linkUp and pingResultLinkUp
- utilities.assert_equals(expect=main.TRUE, actual=caseResult8,
- onpass="Link Up Test PASS",
- onfail="Link Up Test FAIL")
+ utilities.assert_equals( expect=main.TRUE, actual=caseResult8,
+ onpass="Link Up Test PASS",
+ onfail="Link Up Test FAIL" )
- def CASE9(self):
- '''
+ def CASE9( self ):
+ """
Install 114 point intents and verify Ping all works
- '''
+ """
import copy
- main.log.report("Install 114 point intents and verify Ping all")
- main.log.report("___________________________________________")
- main.case("Install 114 point intents and Ping all")
- deviceLinks_copy = copy.copy(deviceLinks)
- main.step("Install 114 point intents")
- for i in range(len(deviceLinks_copy)):
- pointLink = str(deviceLinks_copy[i]).replace("src=","").replace("dst=","").split(',')
- point1 = pointLink[0].split('/')
- point2 = pointLink[1].split('/')
- installResult = main.ONOScli1.add_point_intent(point1[0],point2[0],int(point1[1]),int(point2[1]))
+ main.log.report( "Install 114 point intents and verify Ping all" )
+ main.log.report( "___________________________________________" )
+ main.case( "Install 114 point intents and Ping all" )
+ deviceLinks_copy = copy.copy( deviceLinks )
+ main.step( "Install 114 point intents" )
+ for i in range( len( deviceLinks_copy ) ):
+ pointLink = str(
+ deviceLinks_copy[ i ] ).replace(
+ "src=",
+ "" ).replace(
+ "dst=",
+ "" ).split( ',' )
+ point1 = pointLink[ 0 ].split( '/' )
+ point2 = pointLink[ 1 ].split( '/' )
+ installResult = main.ONOScli1.add_point_intent(
+ point1[ 0 ], point2[ 0 ], int(
+ point1[ 1 ] ), int(
+ point2[ 1 ] ) )
if installResult == main.TRUE:
- print "Installed Point intent between :",point1[0], int(point1[1]), point2[0], int(point2[1])
+ print "Installed Point intent between :", point1[ 0 ], int( point1[ 1 ] ), point2[ 0 ], int( point2[ 1 ] )
- main.step("Obtain the intent id's")
+ main.step( "Obtain the intent id's" )
intentsList = main.ONOScli1.getAllIntentIds()
- ansi_escape = re.compile(r'\x1b[^m]*m')
- intentsList = ansi_escape.sub('', intentsList)
- intentsList = intentsList.replace(" onos:intents | grep id=","").replace("id=","").replace("\r\r","")
- intentsList=intentsList.splitlines()
- intentsList = intentsList[1:]
+ ansi_escape = re.compile( r'\x1b[^m]*m' )
+ intentsList = ansi_escape.sub( '', intentsList )
+ intentsList = intentsList.replace(
+ " onos:intents | grep id=",
+ "" ).replace(
+ "id=",
+ "" ).replace(
+ "\r\r",
+ "" )
+ intentsList = intentsList.splitlines()
+ intentsList = intentsList[ 1: ]
intentIdList = []
- for i in range(len(intentsList)):
- intentsTemp = intentsList[i].split(',')
- intentIdList.append(intentsTemp[0])
+ for i in range( len( intentsList ) ):
+ intentsTemp = intentsList[ i ].split( ',' )
+ intentIdList.append( intentsTemp[ 0 ] )
print "Intent IDs: ", intentIdList
- print "Total Intents installed: ", len(intentIdList)
+ print "Total Intents installed: ", len( intentIdList )
- main.step("Verify Ping across all hosts")
+ main.step( "Verify Ping across all hosts" )
pingResult = main.FALSE
time1 = time.time()
pingResult = main.Mininet1.pingall()
time2 = time.time()
- timeDiff = round((time2-time1),2)
- main.log.report("Time taken for Ping All: "+str(timeDiff)+" seconds")
- utilities.assert_equals(expect=main.TRUE, actual=pingResult,
- onpass="PING ALL PASS",
- onfail="PING ALL FAIL")
+ timeDiff = round( ( time2 - time1 ), 2 )
+ main.log.report(
+ "Time taken for Ping All: " +
+ str( timeDiff ) +
+ " seconds" )
+ utilities.assert_equals( expect=main.TRUE, actual=pingResult,
+ onpass="PING ALL PASS",
+ onfail="PING ALL FAIL" )
case8_result = installResult and pingResult
- utilities.assert_equals(expect=main.TRUE, actual=case8_result,
- onpass="Ping all test after Point intents addition successful",
- onfail="Ping all test after Point intents addition failed")
+ utilities.assert_equals(
+ expect=main.TRUE,
+ actual=case8_result,
+ onpass="Ping all test after Point intents addition successful",
+ onfail="Ping all test after Point intents addition failed" )
- def CASE10(self):
- '''
+ def CASE10( self ):
+ """
Remove all Intents
- '''
- main.log.report("Remove all intents that were installed previously")
- main.log.report("______________________________________________")
- main.log.info("Remove all intents")
- main.case("Removing intents")
- main.step("Obtain the intent id's first")
+ """
+ main.log.report( "Remove all intents that were installed previously" )
+ main.log.report( "______________________________________________" )
+ main.log.info( "Remove all intents" )
+ main.case( "Removing intents" )
+ main.step( "Obtain the intent id's first" )
intentsList = main.ONOScli1.getAllIntentIds()
- ansi_escape = re.compile(r'\x1b[^m]*m')
- intentsList = ansi_escape.sub('', intentsList)
- intentsList = intentsList.replace(" onos:intents | grep id=","").replace("id=","").replace("\r\r","")
- intentsList=intentsList.splitlines()
- intentsList = intentsList[1:]
+ ansi_escape = re.compile( r'\x1b[^m]*m' )
+ intentsList = ansi_escape.sub( '', intentsList )
+ intentsList = intentsList.replace(
+ " onos:intents | grep id=",
+ "" ).replace(
+ "id=",
+ "" ).replace(
+ "\r\r",
+ "" )
+ intentsList = intentsList.splitlines()
+ intentsList = intentsList[ 1: ]
intentIdList = []
step1Result = main.TRUE
- if (len(intentsList) > 1):
- for i in range(len(intentsList)):
- intentsTemp = intentsList[i].split(',')
- intentIdList.append(intentsTemp[0])
+ if ( len( intentsList ) > 1 ):
+ for i in range( len( intentsList ) ):
+ intentsTemp = intentsList[ i ].split( ',' )
+ intentIdList.append( intentsTemp[ 0 ] )
print "Intent IDs: ", intentIdList
- for id in range(len(intentIdList)):
- print "Removing intent id (round 1) :", intentIdList[id]
- main.ONOScli1.remove_intent(intent_id = intentIdList[id])
- time.sleep(1)
+ for id in range( len( intentIdList ) ):
+ print "Removing intent id (round 1) :", intentIdList[ id ]
+ main.ONOScli1.remove_intent( intent_id=intentIdList[ id ] )
+ time.sleep( 1 )
- main.log.info("Verify all intents are removed and if any leftovers try remove one more time")
+ main.log.info(
+ "Verify all intents are removed and if any leftovers try remove one more time" )
intentsList1 = main.ONOScli1.getAllIntentIds()
- ansi_escape = re.compile(r'\x1b[^m]*m')
- intentsList1 = ansi_escape.sub('', intentsList1)
- intentsList1 = intentsList1.replace(" onos:intents | grep id=","").replace(" state=","").replace("\r\r","")
- intentsList1=intentsList1.splitlines()
- intentsList1 = intentsList1[1:]
+ ansi_escape = re.compile( r'\x1b[^m]*m' )
+ intentsList1 = ansi_escape.sub( '', intentsList1 )
+ intentsList1 = intentsList1.replace(
+ " onos:intents | grep id=",
+ "" ).replace(
+ " state=",
+ "" ).replace(
+ "\r\r",
+ "" )
+ intentsList1 = intentsList1.splitlines()
+ intentsList1 = intentsList1[ 1: ]
print "Round 2 (leftover) intents to remove: ", intentsList1
intentIdList1 = []
- if (len(intentsList1) > 1):
- for i in range(len(intentsList1)):
- intentsTemp1 = intentsList[i].split(',')
- intentIdList1.append(intentsTemp1[0])
+ if ( len( intentsList1 ) > 1 ):
+ for i in range( len( intentsList1 ) ):
+ intentsTemp1 = intentsList[ i ].split( ',' )
+ intentIdList1.append( intentsTemp1[ 0 ] )
print "Leftover Intent IDs: ", intentIdList1
- for id in range(len(intentIdList1)):
- print "Removing intent id (round 2):", intentIdList1[id]
- main.ONOScli1.remove_intent(intent_id = intentIdList1[id])
- time.sleep(2)
+ for id in range( len( intentIdList1 ) ):
+ print "Removing intent id (round 2):", intentIdList1[ id ]
+ main.ONOScli1.remove_intent(
+ intent_id=intentIdList1[ id ] )
+ time.sleep( 2 )
else:
print "There are no more intents that need to be removed"
step1Result = main.TRUE
@@ -698,6 +886,6 @@
step1Result = main.FALSE
caseResult7 = step1Result
- utilities.assert_equals(expect=main.TRUE, actual=caseResult7,
- onpass="Intent removal test successful",
- onfail="Intent removal test failed")
\ No newline at end of file
+ utilities.assert_equals( expect=main.TRUE, actual=caseResult7,
+ onpass="Intent removal test successful",
+ onfail="Intent removal test failed" )