Copyright 2016 Open Networking Foundation ( ONF )
Please refer questions to either the onos test mailing list at <>,
the System Testing Plans and Results wiki page at <>,
or the System Testing Guide page at <>
TestON is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
( at your option ) any later version.
TestON is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TestON. If not, see <>.
import os
import time
import json
import urllib
import re
import pexpect
from core import utilities
class Testcaselib:
useSSH = True
def initTest( main ):
- Construct tests variables
- GIT ( optional )
- Checkout ONOS master branch
- Pull latest ONOS code
- Building ONOS ( optional )
- Install ONOS package
- Build ONOS package
from tests.dependencies.ONOSSetup import ONOSSetup
main.testSetUp = ONOSSetup()
except ImportError:
main.log.error( "ONOSSetup not found. exiting the test" )
from tests.dependencies.Network import Network
main.Network = Network()
main.physicalNet = False
main.testSetUp.envSetupDescription( False )
stepResult = main.FALSE
# Test variables
main.cellName = main.params[ 'ENV' ][ 'cellName' ]
main.apps = main.params[ 'ENV' ][ 'cellApps' ]
main.path = os.path.dirname( main.testFile )
main.useCommonTopo = main.params[ 'DEPENDENCY' ][ 'useCommonTopo' ] == 'True'
main.topoPath = main.path + ( "/.." if main.useCommonTopo else "" ) + "/dependencies/"
main.useCommonConf = main.params[ 'DEPENDENCY' ][ 'useCommonConf' ] == 'True'
if main.params[ 'DEPENDENCY' ].get( 'useBmv2' ):
main.useBmv2 = main.params[ 'DEPENDENCY' ][ 'useBmv2' ] == 'True'
main.useBmv2 = False
if main.useBmv2:
main.switchType = main.params[ 'DEPENDENCY' ].get( 'bmv2SwitchType', 'stratum' )
main.switchType = "ovs"
main.configPath = main.path + ( "/.." if main.useCommonConf else "" ) + "/dependencies/"
main.bmv2Path = "/tools/dev/mininet/"
main.forJson = "json/"
main.forChart = "chart/"
main.forConfig = "conf/"
main.forHost = "host/"
main.forSwitchFailure = "switchFailure/"
main.forLinkFailure = "linkFailure/"
main.forMulticast = "multicast/"
main.topology = main.params[ 'DEPENDENCY' ][ 'topology' ]
main.topologyLib = main.params[ 'DEPENDENCY' ][ 'lib' ] if 'lib' in main.params[ 'DEPENDENCY' ] else None
main.topologyConf = main.params[ 'DEPENDENCY' ][ 'conf' ] if 'conf' in main.params[ 'DEPENDENCY' ] else None
main.bmv2 = ""
main.stratumRoot = main.params[ 'DEPENDENCY'][ 'stratumRoot'] if 'stratumRoot' in main.params[ 'DEPENDENCY' ] else None
main.scale = ( main.params[ 'SCALE' ][ 'size' ] ).split( "," )
main.maxNodes = int( main.params[ 'SCALE' ][ 'max' ] )
main.trellisOar = main.params[ 'DEPENDENCY' ][ 'trellisOar' ]
main.t3Oar = main.params[ 'DEPENDENCY' ][ 't3Oar' ] if 't3Oar' in main.params[ 'DEPENDENCY' ] else None
stepResult = main.testSetUp.envSetup( False )
except Exception as e:
main.testSetUp.envSetupException( e )
main.testSetUp.evnSetupConclusion( stepResult )
def installOnos( main, vlanCfg=True, skipPackage=False, cliSleep=10,
parallel=True ):
- Set up cell
- Create cell file
- Set cell file
- Verify cell file
- Kill ONOS process
- Uninstall ONOS cluster
- Verify ONOS start up
- Install ONOS cluster
- Connect to cli
# main.scale[ 0 ] determines the current number of ONOS controller
if main.params.get( 'EXTERNAL_APPS' ):
for app, url in main.params[ 'EXTERNAL_APPS' ].iteritems(): "Downloading %s app from %s" )
main.ONOSbench.onosFetchApp( url )
if not main.apps:
main.log.error( "App list is empty" )
except Exception as e:
main.log.debug( e )
main.cleanAndExit() "Cluster size: " + str( main.Cluster.numCtrls ) ) "Cluster ips: " + ', '.join( main.Cluster.getIps() ) )
main.dynamicHosts = [ 'in1', 'out1' ]
main.testSetUp.ONOSSetUp( main.Cluster, newCell=True, cellName=main.cellName,
installParallel=parallel, includeCaseDesc=False )
ready = utilities.retry( 0 ).CLI.summary,
[ None, main.FALSE ],
attempts=10 )
if ready:
ready = main.TRUE
utilities.assert_equals( expect=main.TRUE, actual=ready,
onpass="ONOS summary command succeded",
onfail="ONOS summary command failed" )
if not ready:
main.log.error( "ONOS startup failed!" )
# Install segmentrouting and t3 app
appInstallResult = main.ONOSbench.onosAppInstall( main.Cluster.runningNodes[0].ipAddress, main.trellisOar)
if main.t3Oar:
appInstallResult = appInstallResult and main.ONOSbench.onosAppInstall( main.Cluster.runningNodes[0].ipAddress, main.t3Oar)
utilities.assert_equals( expect=main.TRUE, actual=appInstallResult,
onpass="SR app installation succeded",
onfail="SR app installation failed" )
if not appInstallResult:
# FIXME: move to somewhere else?
switchPrefix = main.params[ 'DEPENDENCY' ].get( 'switchPrefix' )
# TODO: Support other pipeconfs/making this configurable
if switchPrefix == "tofino":
# It seems to take some time for the pipeconfs to be loaded
ctrl =
for i in range( 120 ):
main.log.debug( "Checking to see if pipeconfs are loaded" )
output = ctrl.CLI.sendline( "pipeconfs" )
if "tofino" in output:
main.log.debug( "Took around %s seconds for the pipeconf to be loaded" % i )
time.sleep( 1 )
except Exception as e:
main.log.error( e )
Testcaselib.setOnosLogLevels( main )
Testcaselib.setOnosConfig( main )
def loadCount( main ):
with open( "%s/count/%s.count" % ( main.configPath, main.cfgName ) ) as count:
main.count = json.load( count )
def loadJson( main, suffix='' ):
with open( "%s%s.json%s" % ( main.configPath + main.forJson,
main.cfgName, suffix ) ) as cfg: 0 ).REST.setNetCfg( json.load( cfg ) )
def loadChart( main ):
with open( "%s%s.chart" % ( main.configPath + main.forChart,
main.cfgName ) ) as chart:
main.pingChart = json.load( chart )
except IOError:
main.log.warn( "No chart file found." )
def loadHost( main ):
with open( "" % ( main.configPath + main.forHost,
main.cfgName ) ) as host:
main.expectedHosts = json.load( host )
def loadSwitchFailureChart( main ):
with open( "%s%s.switchFailureChart" % ( main.configPath + main.forSwitchFailure,
main.cfgName ) ) as sfc:
main.switchFailureChart = json.load( sfc )
def loadLinkFailureChart( main ):
with open( "%s%s.linkFailureChart" % ( main.configPath + main.forLinkFailure,
main.cfgName ) ) as lfc:
main.linkFailureChart = json.load( lfc )
def loadMulticastConfig( main ):
with open( "%s%s.multicastConfig" % ( main.configPath + main.forMulticast,
main.cfgName ) ) as cfg:
main.multicastConfig = json.load( cfg )
def startMininet( main, topology, args="" ): "Copying mininet topology file to mininet machine" )
copyResult = main.ONOSbench.scp( main.Mininet1,
main.topoPath + main.topology,
main.Mininet1.home + "custom",
direction="to" )
if main.topologyLib:
for lib in main.topologyLib.split(","):
copyResult = copyResult and main.ONOSbench.scp( main.Mininet1,
main.topoPath + lib,
main.Mininet1.home + "custom",
direction="to" )
if main.topologyConf:
import re
controllerIPs = [ ctrl.ipAddress for ctrl in main.Cluster.runningNodes ]
index = 0
destDir = "~/"
if 'MN_DOCKER' in main.params and main.params['MN_DOCKER']['args']:
destDir = "/tmp/mn_conf/"
# Try to ensure the destination exists "Create folder for network config files" )
handle = main.Mininet1.handle
handle.sendline( "mkdir -p %s" % destDir )
handle.expect( [ main.Mininet1.prompt, main.Mininet1.dockerPrompt ] )
main.log.debug( handle.before + handle.after )
# Make sure permissions are correct
handle.sendline( "sudo chown %s:%s %s" % ( main.Mininet1.user_name, main.Mininet1.user_name, destDir ) )
handle.expect( [ main.Mininet1.prompt, main.Mininet1.dockerPrompt ] )
main.log.debug( handle.before + handle.after )
handle.sendline( "sudo chmod -R a+rwx %s" % ( destDir ) )
handle.expect( [ main.Mininet1.prompt, main.Mininet1.dockerPrompt ] )
main.log.debug( handle.before + handle.after )
for conf in main.topologyConf.split(","):
# Update zebra configurations with correct ONOS instance IP
if conf in [ "zebradbgp1.conf", "zebradbgp2.conf" ]:
ip = controllerIPs[ index ]
index = ( index + 1 ) % len( controllerIPs )
with open( main.configPath + main.forConfig + conf ) as f:
s =
s = re.sub( r"(fpm connection ip).*(port 2620)", r"\1 " + ip + r" \2", s )
with open( main.configPath + main.forConfig + conf, "w" ) as f:
f.write( s )
copyResult = copyResult and main.ONOSbench.scp( main.Mininet1,
main.configPath + main.forConfig + conf,
direction="to" )
copyResult = copyResult and main.ONOSbench.scp( main.Mininet1,
main.ONOSbench.home + main.bmv2Path + main.bmv2,
main.Mininet1.home + "custom",
direction="to" )
stepResult = copyResult
utilities.assert_equals( expect=main.TRUE,
onpass="Successfully copied topo files",
onfail="Failed to copy topo files" )
if main.stratumRoot:
main.Mininet1.handle.sendline( "export STRATUM_ROOT=" + str( main.stratumRoot ) )
main.Mininet1.handle.expect( main.Mininet1.prompt )
main.step( "Starting Mininet Topology" )
arg = "--onos-ip=%s %s" % (",".join([ctrl.ipAddress for ctrl in main.Cluster.runningNodes]), args)
main.topology = topology
topoResult = main.Mininet1.startNet(
topoFile=main.Mininet1.home + "custom/" + main.topology, args=arg )
stepResult = topoResult
utilities.assert_equals( expect=main.TRUE,
onpass="Successfully loaded topology",
onfail="Failed to load topology" )
# Exit if topology did not load properly
if not topoResult:
if main.useBmv2:
# Upload the net-cfg file created for each switch
filename = "onos-netcfg.json"
switchPrefix = main.params[ 'DEPENDENCY' ].get( 'switchPrefix', "bmv2" )
for switch in main.Mininet1.getSwitches( switchRegex=r"(StratumBmv2Switch)|(Bmv2Switch)" ).keys():
path = "/tmp/mn-stratum/%s/" % switch
dstPath = "/tmp/"
dstFileName = "%s-onos-netcfg.json" % switch
main.ONOSbench1.scp( main.Mininet1,
"%s%s" % ( path, filename ),
"%s%s" % ( dstPath, dstFileName ),
"from" )
main.ONOSbench1.handle.sendline( "sudo sed -i 's/localhost/%s/g' %s%s" % ( main.Mininet1.ip_address, dstPath, dstFileName ) )
# Configure managementAddress
main.ONOSbench1.handle.sendline( "sudo sed -i 's/localhost/%s/g' %s%s" % ( main.Mininet1.ip_address, dstPath, dstFileName ) )
main.ONOSbench1.handle.expect( main.ONOSbench1.prompt )
main.log.debug( main.ONOSbench1.handle.before + main.ONOSbench1.handle.after )
# Configure device id
main.ONOSbench1.handle.sendline( "sudo sed -i 's/device:%s/device:%s:%s/g' %s%s" % ( switch, switchPrefix, switch, dstPath, dstFileName ) )
main.ONOSbench1.handle.expect( main.ONOSbench1.prompt )
main.log.debug( main.ONOSbench1.handle.before + main.ONOSbench1.handle.after )
# Configure device name
main.ONOSbench1.handle.sendline( "sudo sed -i '/\"basic\"/a\ \"name\": \"%s\",' %s%s" % ( switch, dstPath, dstFileName ) )
main.ONOSbench1.handle.expect( main.ONOSbench1.prompt )
main.log.debug( main.ONOSbench1.handle.before + main.ONOSbench1.handle.after )
main.ONOSbench1.onosNetCfg( main.ONOSserver1.ip_address, dstPath, dstFileName )
# Make sure hosts make some noise
Testcaselib.discoverHosts( main )
def discoverHosts( main ):
# TODO add option to only select specific hosts
if hasattr( main, "Mininet1" ):
network = main.Mininet1
elif hasattr( main, "NetworkBench" ):
network = main.NetworkBench
main.log.warn( "Could not find component for test network, skipping host discovery" )
def connectToPhysicalNetwork( main ):
main.step( "Connecting to physical netowrk" )
main.physicalNet = True
topoResult = main.NetworkBench.connectToNet()
stepResult = topoResult
utilities.assert_equals( expect=main.TRUE,
onpass="Successfully connected to topology",
onfail="Failed to connect to topology" )
# Exit if topology did not load properly
if not topoResult:
# Perform any optional setup
for switch in main.NetworkBench.switches:
if hasattr( switch, "setup" ):
switch.setup() # We might not need this
main.step( "Assign switches to controllers." )
stepResult = main.TRUE
switches = main.NetworkBench.getSwitches()
pool = []
for name in switches.keys():
# NOTE: although this terminology is ovsdb centric, we can use this function for other switches too
# e.g. push onos net-cfg for stratum switches
thread = main.Thread( target=main.NetworkBench.assignSwController,
args=[ name, main.Cluster.getIps(), '6653' ] )
pool.append( thread )
for thread in pool:
thread.join( 300 )
if not thread.result:
stepResult = main.FALSE
utilities.assert_equals( expect=main.TRUE,
onpass="Successfully assign switches to controllers",
onfail="Failed to assign switches to controllers" )
# Check devices
Testcaselib.checkDevices( main, switches=int( main.params[ 'TOPO' ][ 'switchNum' ] ) )
# Connecting to hosts that only have data plane connectivity
main.step( "Connecting inband hosts" )
stepResult = main.Network.connectInbandHosts()
utilities.assert_equals( expect=main.TRUE,
onpass="Successfully connected inband hosts",
onfail="Failed to connect inband hosts" )
Testcaselib.discoverHosts( main )
def saveOnosDiagnostics( main ):
Get onos-diags.tar.gz and save it to the log directory.
suffix: suffix string of the file name. E.g. onos-diags-case1.tar.gz
""" "Collecting onos-diags..." )
main.ONOSbench.onosDiagnostics( [ctrl.ipAddress for ctrl in main.Cluster.runningNodes],
"-CASE%d" % main.CurrentTestCaseNumber )
def config( main, cfgName ):
main.spines = []
main.failures = int( main.params[ 'failures' ] )
main.cfgName = cfgName
if main.cfgName == '2x2':
spine = {}
spine[ 'name' ] = main.params[ 'switches' ][ 'spine1' ]
spine[ 'dpid' ] = main.params[ 'switches' ][ 'spinedpid1' ]
main.spines.append( spine )
spine = {}
spine[ 'name' ] = main.params[ 'switches' ][ 'spine2' ]
spine[ 'dpid' ] = main.params[ 'switches' ][ 'spinedpid2' ]
main.spines.append( spine )
elif main.cfgName == '4x4':
spine = {}
spine[ 'name' ] = main.params[ 'switches' ][ 'spine1' ]
spine[ 'dpid' ] = main.params[ 'switches' ][ 'spinedpid1' ]
main.spines.append( spine )
spine = {}
spine[ 'name' ] = main.params[ 'switches' ][ 'spine2' ]
spine[ 'dpid' ] = main.params[ 'switches' ][ 'spinedpid2' ]
main.spines.append( spine )
spine = {}
spine[ 'name' ] = main.params[ 'switches' ][ 'spine3' ]
spine[ 'dpid' ] = main.params[ 'switches' ][ 'spinedpid3' ]
main.spines.append( spine )
spine = {}
spine[ 'name' ] = main.params[ 'switches' ][ 'spine4' ]
spine[ 'dpid' ] = main.params[ 'switches' ][ 'spinedpid4' ]
main.spines.append( spine )
main.log.error( "Configuration failed!" )
def addStaticOnosRoute( main, subnet, intf):
Adds an ONOS static route with the use route-add command.
routeResult = 0 ).addStaticRoute(subnet, intf)
def checkGroupsForBuckets( main, deviceId, subnetDict, routingTable=30 ):
Check number of groups for each subnet on device deviceId and matches
it with an expected value. subnetDict is a dictionarty containing values
of the type "" : 5.
main.step( "Checking if number of groups for subnets in device {0} is as expected.".format( deviceId ) )
groups = 0 ).CLI.getGroups( deviceId, groupType="select" )
flows = 0 ).CLI.flows( jsonFormat=False, device=deviceId )
result = main.TRUE
for subnet, numberInSelect in subnetDict.iteritems():
for flow in flows.splitlines():
if "tableId={0}".format( routingTable ) in flow and subnet in flow:
# this will match the group id that this flow entry points to, for example :
# 0x70000041 in flow entry which contains "deferred=[GROUP:0x70000041], transition=TABLE:60,"
groupId = r".*GROUP:(0x.*)], transition.*", flow ).groups()[0]
count = 0
for group in groups.splitlines():
if 'id={0}'.format( groupId ) in group:
count += 1
if count - 1 != numberInSelect:
result = main.FALSE
main.log.warn( "Mismatch in number of buckets of select group, found {0}, expected {1} for subnet {2} on device {3}".format( count - 1, numberInSelect, subnet, deviceId ) )
utilities.assert_equals( expect=main.TRUE, actual=result,
onpass="All bucket numbers are as expected",
onfail="Some bucket numbers are not as expected" )
def checkFlows( main, minFlowCount, tag="", dumpflows=True, sleep=10 ):
"Check whether the flow count is bigger than %s" % minFlowCount )
if tag == "":
tag = 'CASE%d' % main.CurrentTestCaseNumber
count = utilities.retry( 0 ).CLI.checkFlowCount,
kwargs={ 'min': minFlowCount },
sleep=sleep )
if count == main.FALSE:
count = 0 ).CLI.checkFlowCount()
actual=( count > minFlowCount ),
onpass="Flow count looks correct; found %s, expecting at least %s" % ( count, minFlowCount ),
onfail="Flow count looks wrong; found %s, expecting at least %s" % ( count, minFlowCount ) )
main.step( "Check whether all flow status are ADDED" )
flowCheck = utilities.retry( 0 ).CLI.checkFlowsState,
kwargs={ 'isPENDING': False },
sleep=sleep )
onpass="Flow status is correct!",
onfail="Flow status is wrong!" )
if dumpflows:
main.ONOSbench.dumpONOSCmd( 0 ).ipAddress,
tag + "_FlowsBefore" )
main.ONOSbench.dumpONOSCmd( 0 ).ipAddress,
tag + "_GroupsBefore" )
def checkDevices( main, switches, tag="", sleep=10 ):
"Check whether the switches count is equal to %s" % switches )
if tag == "":
tag = 'CASE%d' % main.CurrentTestCaseNumber
result = utilities.retry( 0 ).CLI.checkStatus,
kwargs={ 'numoswitch': switches},
sleep=sleep )
utilities.assert_equals( expect=main.TRUE, actual=result,
onpass="Device up successful",
onfail="Failed to boot up devices?" )
def checkFlowsByDpid( main, dpid, minFlowCount, sleep=10 ):
"Check whether the flow count of device %s is bigger than %s" % ( dpid, minFlowCount ) )
count = utilities.retry( 0 ).CLI.checkFlowAddedCount,
args=( dpid, minFlowCount ),
sleep=sleep )
if count == main.FALSE:
count = 0 ).CLI.checkFlowAddedCount( dpid )
actual=( count > minFlowCount ),
onpass="Flow count looks correct: " + str( count ),
onfail="Flow count looks wrong: " + str( count ) )
def checkFlowEqualityByDpid( main, dpid, flowCount, sleep=10 ):
"Check whether the flow count of device %s is equal to %s" % ( dpid, flowCount ) )
count = utilities.retry( 0 ).CLI.checkFlowAddedCount,
args=( dpid, flowCount, False, 1 ),
sleep=sleep )
if count == main.FALSE:
count = 0 ).CLI.checkFlowAddedCount( dpid )
actual=( count == flowCount ),
onpass="Flow count looks correct: " + str( count ) ,
onfail="Flow count looks wrong. found {}, should be {}.".format( count, flowCount ) )
def checkGroupEqualityByDpid( main, dpid, groupCount, sleep=10):
"Check whether the group count of device %s is equal to %s" % ( dpid, groupCount ) )
count = utilities.retry( 0 ).CLI.checkGroupAddedCount,
args=( dpid, groupCount, False, 1),
sleep=sleep )
if count == main.FALSE:
count = 0 ).CLI.checkGroupAddedCount( dpid )
actual=( count == groupCount ),
onpass="Group count looks correct: " + str( count ) ,
onfail="Group count looks wrong. found {}, should be {}.".format( count, groupCount ) )
def checkFlowsGroupsFromFile( main ):
for dpid, values in main.count.items():
flowCount = values["flows"]
groupCount = values["groups"] "Check flow count for dpid " + str( dpid ) +
", should be " + str( flowCount ) )
Testcaselib.checkFlowEqualityByDpid( main, dpid, flowCount ) "Check group count for dpid " + str( dpid ) +
", should be " + str( groupCount ) )
Testcaselib.checkGroupEqualityByDpid( main, dpid, groupCount )
def pingAll( main, tag="", dumpflows=True, acceptableFailed=0, basedOnIp=False,
sleep=10, retryAttempts=1, skipOnFail=False ):
Verify connectivity between hosts according to the ping chart
acceptableFailed: max number of acceptable failed pings.
basedOnIp: if True, run ping or ping6 based on suffix of host names
retryAttempts: the number of retry ping. Only works for IPv4 hosts.
''' "Check host connectivity" )
main.log.debug( "Ping chart: %s" % main.pingChart )
if tag == "":
tag = 'CASE%d' % main.CurrentTestCaseNumber
for entry in main.pingChart.itervalues():
main.log.debug( "Entry in ping chart: %s" % entry )
expect = entry[ 'expect' ]
if expect == "Unidirectional":
# Verify ping from each src host to each dst host
src = entry[ 'src' ]
dst = entry[ 'dst' ]
expect = main.TRUE
main.step( "Verify unidirectional connectivity from %s to %s with tag %s" % ( str( src ), str( dst ), tag ) )
if basedOnIp:
if ("v4" in src[0]):
pa = main.Network.pingallHostsUnidirectional( src, dst, acceptableFailed=acceptableFailed )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IPv4 connectivity successfully tested",
onfail="IPv4 connectivity failed" )
if ("v6" in src[0]):
pa = main.Network.pingallHostsUnidirectional( src, dst, ipv6=True, acceptableFailed=acceptableFailed )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IPv6 connectivity successfully tested",
onfail="IPv6 connectivity failed" )
elif main.physicalNet:
pa = main.NetworkBench.pingallHostsUnidirectional( src, dst, acceptableFailed=acceptableFailed, useScapy=True )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IP connectivity successfully tested",
onfail="IP connectivity failed" )
pa = main.Network.pingallHostsUnidirectional( src, dst, acceptableFailed=acceptableFailed )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IP connectivity successfully tested",
onfail="IP connectivity failed" )
# Verify ping between each host pair
hosts = entry[ 'hosts' ]
expect = main.TRUE if str(expect).lower() == 'true' else main.FALSE
expect = main.FALSE
main.step( "Verify full connectivity for %s with tag %s" % ( str( hosts ), tag ) )
if basedOnIp:
if ("v4" in hosts[0]):
pa = utilities.retry( main.Network.pingallHosts,
main.FALSE if expect else main.TRUE,
args=(hosts, ),
kwargs={ 'ipv6': False },
sleep=sleep )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IPv4 connectivity successfully tested",
onfail="IPv4 connectivity failed" )
if ("v6" in hosts[0]):
pa = main.Network.pingIpv6Hosts( hosts, acceptableFailed=acceptableFailed )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IPv6 connectivity successfully tested",
onfail="IPv6 connectivity failed" )
elif main.physicalNet:
pa = main.Network.pingallHosts( hosts, ipv6=True, useScapy=True )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IP connectivity successfully tested",
onfail="IP connectivity failed" )
pa = main.Network.pingallHosts( hosts )
utilities.assert_equals( expect=expect, actual=pa,
onpass="IP connectivity successfully tested",
onfail="IP connectivity failed" )
if pa != expect:
Testcaselib.saveOnosDiagnostics( main )
if skipOnFail and pa != expect:
Testcaselib.cleanup( main, copyKarafLog=False )
if dumpflows:
main.ONOSbench.dumpONOSCmd( 0 ).ipAddress,
tag + "_FlowsOn" )
main.ONOSbench.dumpONOSCmd( 0 ).ipAddress,
tag + "_GroupsOn" )
def killLink( main, end1, end2, switches, links, sleep=None ):
end1,end2: identify the switches, ex.: 'leaf1', 'spine1'
switches, links: number of expected switches and links after linkDown, ex.: '4', '6'
Kill a link and verify ONOS can see the proper link change
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'LinkDiscovery' ] )
sleep = float( sleep )
main.step( "Kill link between %s and %s" % ( end1, end2 ) )
linkDown = END1=end1, END2=end2, OPTION="down" )
linkDown = linkDown and END2=end1, END1=end2, OPTION="down" )
# TODO: Can remove this, since in the retry we will wait anyways if topology is incorrect
"Waiting %s seconds for link down to be discovered" % sleep )
time.sleep( sleep )
topology = utilities.retry( 0 ).CLI.checkStatus,
kwargs={ 'numoswitch': switches,
'numolink': links },
sleep=sleep )
result = topology and linkDown
utilities.assert_equals( expect=main.TRUE, actual=result,
onpass="Link down successful",
onfail="Failed to turn off link?" )
def killLinkBatch( main, links, linksAfter, switches, sleep=None ):
links = list of links (src, dst) to bring down.
main.step("Killing a batch of links {0}".format(links))
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'LinkDiscovery' ] )
sleep = float( sleep )
for end1, end2 in links: END1=end1, END2=end2, OPTION="down") END1=end2, END2=end1, OPTION="down")
# TODO: Can remove this, since in the retry we will wait anyways if topology is incorrect
"Waiting %s seconds for links down to be discovered" % sleep )
time.sleep( sleep )
topology = utilities.retry( 0 ).CLI.checkStatus,
kwargs={ 'numoswitch': switches,
'numolink': linksAfter },
sleep=sleep )
utilities.assert_equals( expect=main.TRUE, actual=topology,
onpass="Link batch down successful",
onfail="Link batch down failed" )
def restoreLinkBatch( main, links, linksAfter, switches, sleep=None ):
links = list of link (src, dst) to bring up again.
main.step("Restoring a batch of links {0}".format(links))
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'LinkDiscovery' ] )
sleep = float( sleep )
for end1, end2 in links: END1=end1, END2=end2, OPTION="up") END1=end2, END2=end1, OPTION="up")
"Waiting %s seconds for links up to be discovered" % sleep )
time.sleep( sleep )
topology = utilities.retry( 0 ).CLI.checkStatus,
kwargs={ 'numoswitch': switches,
'numolink': linksAfter },
sleep=sleep )
utilities.assert_equals( expect=main.TRUE, actual=topology,
onpass="Link batch up successful",
onfail="Link batch up failed" )
def disablePortBatch( main, ports, switches=None, links=None, sleep=None ):
Disable a list of switch ports using 'portstate' and verify ONOS can see the proper link change
ports: a list of ports to disable ex. [ [ "of:0000000000000001", 1 ] ]
switches, links: number of expected switches and links after link change, ex.: '4', '6'
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'LinkDiscovery' ] )
sleep = float( sleep )
main.step( "Disable a batch of ports" )
for dpid, port in ports: 0 ).CLI.portstate( dpid=dpid, port=port, state="disable" ) "Waiting {} seconds for port down to be discovered".format( sleep ) )
time.sleep( sleep )
if switches and links:
result = 0 ).CLI.checkStatus( numoswitch=switches,
numolink=links )
utilities.assert_equals( expect=main.TRUE, actual=result,
onpass="Port down successful",
onfail="Port down failed" )
def enablePortBatch( main, ports, switches, links, sleep=None ):
Enable a list of switch ports using 'portstate' and verify ONOS can see the proper link change
ports: a list of ports to enable ex. [ [ "of:0000000000000001", 1 ] ]
switches, links: number of expected switches and links after link change, ex.: '4', '6'
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'LinkDiscovery' ] )
sleep = float( sleep )
main.step( "Enable a batch of ports" )
for dpid, port in ports: 0 ).CLI.portstate( dpid=dpid, port=port, state="enable" ) "Waiting {} seconds for port up to be discovered".format( sleep ) )
time.sleep( sleep )
if switches and links:
result = 0 ).CLI.checkStatus( numoswitch=switches,
numolink=links )
utilities.assert_equals( expect=main.TRUE, actual=result,
onpass="Port up successful",
onfail="Port up failed" )
def restoreLink( main, end1, end2, switches, links,
portUp=False, dpid1='', dpid2='', port1='', port2='', sleep=None ):
end1,end2: identify the end switches, ex.: 'leaf1', 'spine1'
portUp: enable portstate after restoring link
dpid1, dpid2: dpid of the end switches respectively, ex.: 'of:0000000000000002'
port1, port2: respective port of the end switches that connects to the link, ex.:'1'
switches, links: number of expected switches and links after linkDown, ex.: '4', '6'
Kill a link and verify ONOS can see the proper link change
main.step( "Restore link between %s and %s" % ( end1, end2 ) )
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'LinkDiscovery' ] )
sleep = float( sleep )
result = False
count = 0
while True:
count += 1
ctrl = END1=end1, END2=end2, OPTION="up" ) END2=end1, END1=end2, OPTION="up" )
"Waiting %s seconds for link up to be discovered" % sleep )
time.sleep( sleep )
if portUp:
ctrl.CLI.portstate( dpid=dpid1, port=port1, state='Enable' )
ctrl.CLI.portstate( dpid=dpid2, port=port2, state='Enable' )
"Waiting %s seconds for link up to be discovered" % sleep )
time.sleep( sleep )
result = ctrl.CLI.checkStatus( numoswitch=switches,
numolink=links )
if count > 5 or result:
utilities.assert_equals( expect=main.TRUE, actual=result,
onpass="Link up successful",
onfail="Failed to bring link up" )
def killSwitch( main, switch, switches, links, sleep=None ):
Params: switches, links: number of expected switches and links after SwitchDown, ex.: '4', '6'
Completely kill a switch and verify ONOS can see the proper change
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'SwitchDiscovery' ] )
sleep = float( sleep )
switch = switch if isinstance( switch, list ) else [ switch ]
main.step( "Kill " + str( switch ) )
for s in switch: "Stopping " + s )
main.Network.switch( SW=s, OPTION="stop" )
# todo make this repeatable
# TODO: Can remove this, since in the retry we will wait anyways if topology is incorrect "Waiting %s seconds for switch down to be discovered" % (
sleep ) )
time.sleep( sleep )
topology = utilities.retry( 0 ).CLI.checkStatus,
kwargs={ 'numoswitch': switches,
'numolink': links },
sleep=sleep )
utilities.assert_equals( expect=main.TRUE, actual=topology,
onpass="Kill switch successful",
onfail="Failed to kill switch?" )
def recoverSwitch( main, switch, switches, links, rediscoverHosts=False, hostsToDiscover=[], sleep=None ):
Params: switches, links: number of expected switches and links after SwitchUp, ex.: '4', '6'
Recover a switch and verify ONOS can see the proper change
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'SwitchDiscovery' ] )
sleep = float( sleep )
# TODO make this repeatable
switch = switch if isinstance( switch, list ) else [ switch ]
main.step( "Recovering " + str( switch ) )
for s in switch: "Starting " + s )
main.Network.switch( SW=s, OPTION="start" ) "Waiting %s seconds for switch up to be discovered" % (
sleep ) )
time.sleep( sleep )
if rediscoverHosts:
main.Network.discoverHosts( hostList=hostsToDiscover ) "Waiting %s seconds for hosts to get re-discovered" % (
sleep ) )
time.sleep( sleep )
topology = utilities.retry( 0 ).CLI.checkStatus,
kwargs={ 'numoswitch': switches,
'numolink': links },
sleep=sleep )
utilities.assert_equals( expect=main.TRUE, actual=topology,
onpass="Switch recovery successful",
onfail="Failed to recover switch?" )
def killRouter( main, router, sleep=None ):
Kill bgpd process on a quagga router
router: name of the router to be killed. E.g. "bgp1"
sleep = float( sleep )
main.step( "Kill " + str( router ) )
if hasattr( main, 'Mininet1' ):
main.Mininet1.handle.sendline( "px {}.stopProtocols()".format( router ) )
main.Mininet1.handle.expect( "mininet>" )
# TODO: support killing router in physical network
pass "Waiting %s seconds for router down to be discovered" % ( sleep ) )
time.sleep( sleep )
def recoverRouter( main, router, sleep=None ):
Restart bgpd process on a quagga router
router: name of the router to be recovered. E.g. "bgp1"
sleep = float( sleep )
main.step( "Recovering " + str( router ) )
if hasattr( main, 'Mininet1' ):
main.Mininet1.handle.sendline( "px {}.startProtocols()".format( router ) )
main.Mininet1.handle.expect( "mininet>" )
# TODO: support recovering router in physical network
pass "Waiting %s seconds for router up to be discovered" % ( sleep ) )
time.sleep( sleep )
def cleanup( main, copyKarafLog=True, removeHostComponent=False ):
Stop Onos-cluster.
Stops Mininet
Copies ONOS log
from tests.dependencies.utils import Utils
main.utils = Utils()
for ctrl in
ctrl.CLI.log( "\"Ending Test - Shutting down ONOS and Network\"", level="INFO" )
# Clean up scapy hosts
if hasattr( main, "scapyHosts" ):
scapyResult = main.TRUE
for host in main.scapyHosts:
scapyResult = host.stopScapy() and scapyResult "Stopped Scapy Host: {0}".format( ) )
for host in main.scapyHosts:
if hasattr( main, 'Mininet1' ):
scapyResult = main.Scapy.removeHostComponent( ) and scapyResult
scapyResult = main.Network.removeHostComponent( ) and scapyResult "Removed Scapy Host Component: {0}".format( ) )
main.scapyHosts = []
if removeHostComponent:
for host in main.internalIpv4Hosts + main.internalIpv6Hosts + main.externalIpv4Hosts + main.externalIpv6Hosts:
if hasattr( main, host ):
if hasattr( main, 'Mininet1' ):
getattr( main, host ).disconnectInband()
main.Network.removeHostComponent( host )
if hasattr( main, 'Mininet1' ):
main.utils.mininetCleanup( main.Mininet1 )
if copyKarafLog:
main.utils.copyKarafLog( "CASE%d" % main.CurrentTestCaseNumber, before=True, includeCaseDesc=False )
for ctrl in
main.ONOSbench.onosStop( ctrl.ipAddress )
Testcaselib.mnDockerTeardown( main )
def verifyNodes( main ):
Verifies Each active node in the cluster has an accurate view of other node's and their status
nodes, integer array with position of the ONOS nodes in the CLIs array
nodeResults = utilities.retry( main.Cluster.nodesCheck,
sleep=10 )
utilities.assert_equals( expect=True, actual=nodeResults,
onpass="Nodes check successful",
onfail="Nodes check NOT successful" )
if not nodeResults:
for ctrl in main.Cluster.runningNodes:
main.log.debug( "{} components not ACTIVE: \n{}".format(,
ctrl.CLI.sendline( "onos:scr-list | grep -v ACTIVE" ) ) )
main.log.error( "Failed to verify nodes, stopping test" )
def verifyTopology( main, switches, links, expNodes ):
Verifies that the ONOS cluster has an acuurate view of the topology
switches, links, expNodes: number of expected switches, links, and nodes at this point in the test ex.: '4', '6', '2'
main.step( "Check number of topology elements" )
topology = utilities.retry( 0 ).CLI.checkStatus,
kwargs={ 'numoswitch': switches,
'numolink': links,
'numoctrl': expNodes },
sleep=12 )
utilities.assert_equals( expect=main.TRUE, actual=topology,
onpass="Number of topology elements are correct",
onfail="Unexpected number of links, switches, and/or controllers" )
def killOnos( main, nodes, switches, links, expNodes, sleep=None ):
Params: nodes, integer array with position of the ONOS nodes in the CLIs array
switches, links, nodes: number of expected switches, links and nodes after KillOnos, ex.: '4', '6'
Completely Kill an ONOS instance and verify the ONOS cluster can see the proper change
# TODO: We have enough information in the Cluster instance to remove expNodes from here and verifyTopology
main.step( "Killing ONOS instances with index(es): {}".format( nodes ) )
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'OnosDiscovery' ] )
sleep = float( sleep )
for i in nodes:
killResult = main.ONOSbench.onosDie( main.Cluster.runningNodes[ i ].ipAddress )
utilities.assert_equals( expect=main.TRUE, actual=killResult,
onpass="ONOS instance Killed",
onfail="Error killing ONOS instance" )
main.Cluster.runningNodes[ i ].active = False
main.log.debug( "sleeping %i seconds" % ( sleep ) )
time.sleep( sleep )
if len( nodes ) < main.Cluster.numCtrls:
Testcaselib.verifyNodes( main )
Testcaselib.verifyTopology( main, switches, links, expNodes )
def recoverOnos( main, nodes, switches, links, expNodes, sleep=None ):
Params: nodes, integer array with position of the ONOS nodes in the CLIs array
switches, links, nodes: number of expected switches, links and nodes after recoverOnos, ex.: '4', '6'
Recover an ONOS instance and verify the ONOS cluster can see the proper change
main.step( "Recovering ONOS instances with index(es): {}".format( nodes ) )
if sleep is None:
sleep = float( main.params[ 'timers' ][ 'OnosDiscovery' ] )
sleep = float( sleep )
[ main.ONOSbench.onosStart( main.Cluster.runningNodes[ i ].ipAddress ) for i in nodes ]
main.log.debug( "sleeping %i seconds" % ( sleep ) )
time.sleep( sleep )
for i in nodes:
isUp = main.ONOSbench.isup( main.Cluster.runningNodes[ i ].ipAddress )
utilities.assert_equals( expect=main.TRUE, actual=isUp,
onpass="ONOS service is ready",
onfail="ONOS service did not start properly" )
for i in nodes:
main.step( "Checking if ONOS CLI is ready" )
ctrl = main.Cluster.runningNodes[ i ]
# ctrl.CLI.startCellCli()
cliResult = ctrl.CLI.startOnosCli( ctrl.ipAddress,
onosStartTimeout=100 ) = True
utilities.assert_equals( expect=main.TRUE,
onpass="ONOS CLI is ready",
onfail="ONOS CLI is not ready" )
main.step( "Checking ONOS nodes" )
Testcaselib.verifyNodes( main )
Testcaselib.verifyTopology( main, switches, links, expNodes )
ready = utilities.retry( 0 ).CLI.summary,
[ None, main.FALSE ],
sleep=12 )
if ready:
ready = main.TRUE
utilities.assert_equals( expect=main.TRUE, actual=ready,
onpass="ONOS summary command succeded",
onfail="ONOS summary command failed" )
if not ready:
main.log.error( "ONOS startup failed!" )
def addHostCfg( main ):
Adds Host Configuration to ONOS
Updates expected state of the network ( pingChart )
import json
hostCfg = {}
with open( main.configPath + main.forJson + "extra.json" ) as template:
hostCfg = json.load( template )
main.pingChart[ 'ip' ][ 'hosts' ] += [ 'in1' ]
main.step( "Pushing new configuration" )
mac, cfg = hostCfg[ 'hosts' ].popitem() 0 ).REST.setNetCfg( cfg[ 'basic' ],
subjectKey=urllib.quote( mac,
safe='' ),
configKey="basic" )
main.pingChart[ 'ip' ][ 'hosts' ] += [ 'out1' ]
main.step( "Pushing new configuration" )
mac, cfg = hostCfg[ 'hosts' ].popitem() 0 ).REST.setNetCfg( cfg[ 'basic' ],
subjectKey=urllib.quote( mac,
safe='' ),
configKey="basic" )
main.pingChart.update( { 'vlan1': { "expect": "True",
"hosts": [ "olt1", "vsg1" ] } } )
main.pingChart[ 'vlan5' ][ 'expect' ] = 0
main.pingChart[ 'vlan10' ][ 'expect' ] = 0
ports = "[%s,%s]" % ( 5, 6 )
cfg = '{"of:0000000000000001":[{"vlan":1,"ports":%s,"name":"OLT 1"}]}' % ports 0 ).REST.setNetCfg( json.loads( cfg ),
configKey="xconnect" )
def delHostCfg( main ):
Removest Host Configuration from ONOS
Updates expected state of the network ( pingChart )
import json
hostCfg = {}
with open( main.configPath + main.forJson + "extra.json" ) as template:
hostCfg = json.load( template )
main.step( "Removing host configuration" )
main.pingChart[ 'ip' ][ 'expect' ] = 0
mac, cfg = hostCfg[ 'hosts' ].popitem() 0 ).REST.removeNetCfg( subjectClass="hosts",
safe='' ),
configKey="basic" )
main.step( "Removing configuration" )
main.pingChart[ 'ip' ][ 'expect' ] = 0
mac, cfg = hostCfg[ 'hosts' ].popitem() 0 ).REST.removeNetCfg( subjectClass="hosts",
safe='' ),
configKey="basic" )
main.step( "Removing vlan configuration" )
main.pingChart[ 'vlan1' ][ 'expect' ] = 0 0 ).REST.removeNetCfg( subjectClass="apps",
configKey="xconnect" )
def verifyNetworkHostIp( main, attempts=10, sleep=10 ):
Verifies IP address assignment from the hosts
main.step( "Verify IP address assignment from hosts" )
ipResult = main.TRUE
# Find out names of disconnected hosts
disconnectedHosts = []
if hasattr( main, "disconnectedIpv4Hosts" ):
for host in main.disconnectedIpv4Hosts:
disconnectedHosts.append( host )
if hasattr( main, "disconnectedIpv6Hosts" ):
for host in main.disconnectedIpv6Hosts:
disconnectedHosts.append( host )
for hostName, ip in main.expectedHosts[ "network" ].items():
# Exclude disconnected hosts
if hostName in disconnectedHosts:
main.log.debug( "Skip verifying IP for {} as it's disconnected".format( hostName ) )
ipResult = ipResult and utilities.retry( main.Network.verifyHostIp,
kwargs={ 'hostList': [ hostName ],
'prefix': ip,
'update': True },
sleep=sleep )
utilities.assert_equals( expect=main.TRUE, actual=ipResult,
onpass="Verify network host IP succeded",
onfail="Verify network host IP failed" )
def verifyOnosHostIp( main, attempts=10, sleep=10, skipOnFail=True ):
Verifies host IP address assignment from ONOS
main.step( "Verify host IP address assignment in ONOS" )
ipResult = main.TRUE
# Find out IPs of disconnected hosts
disconnectedIps = []
if hasattr( main, "disconnectedIpv4Hosts" ):
for host in main.disconnectedIpv4Hosts:
disconnectedIps.append( main.expectedHosts[ "network" ][ host ] )
if hasattr( main, "disconnectedIpv6Hosts" ):
for host in main.disconnectedIpv6Hosts:
disconnectedIps.append( main.expectedHosts[ "network" ][ host ] )
for hostName, ip in main.expectedHosts[ "onos" ].items():
# Exclude disconnected hosts
if ip in disconnectedIps:
main.log.debug( "Skip verifying IP for {} as it's disconnected".format( ip ) )
ipResult = ipResult and utilities.retry( 0 ).verifyHostIp,
kwargs={ 'hostList': [ hostName ],
'prefix': ip },
sleep=sleep )
utilities.assert_equals( expect=main.TRUE, actual=ipResult,
onpass="Verify ONOS host IP succeded",
onfail="Verify ONOS host IP failed" )
if not ipResult and skipOnFail:
Testcaselib.saveOnosDiagnostics( main )
Testcaselib.cleanup( main, copyKarafLog=False )
def updateIntfCfg( main, connectPoint, ips=[], untagged=0, tagged=[], native=0 ):
Updates interface configuration in ONOS, with given IP and vlan parameters
* connectPoint: connect point to update configuration
* ips: list of IP addresses, combined with '/xx' subnet representation,
corresponding to 'ips' field in the configuration
* untagged: vlan ID as an integer, corresponding to 'vlan-untagged' field in the configuration
* tagged: integer list of vlan IDs, corresponding to 'vlan-tagged' field in the configuration
* native: vlan ID as an integer, corresponding to 'vlan-native' field in the configuration
cfg = dict()
cfg[ "ports" ] = dict()
cfg[ "ports" ][ connectPoint ] = dict()
cfg[ "ports" ][ connectPoint ][ "interfaces" ] = [ dict() ]
cfg[ "ports" ][ connectPoint ][ "interfaces" ][ 0 ][ "ips" ] = ips
if untagged > 0:
cfg[ "ports" ][ connectPoint ][ "interfaces" ][ 0 ][ "vlan-untagged" ] = untagged
cfg[ "ports" ][ connectPoint ][ "interfaces" ][ 0 ][ "vlan-tagged" ] = tagged
if native > 0:
cfg[ "ports" ][ connectPoint ][ "interfaces" ][ 0 ][ "vlan-native" ] = native 0 ).REST.setNetCfg( json.loads( json.dumps( cfg ) ) )
def startScapyHosts( main, scapyNames=[], mininetNames=[] ):
Create host components and start Scapy CLIs
scapyNames: list of names that will be used as component names for scapy hosts
mininetNames: used when scapy host names are different from the host names
in Mininet. E.g. when scapyNames=['h1Scapy'], it's required to specify the
name of the corresponding Mininet host by mininetNames=['h1']
main.step( "Start Scapy CLIs" )
main.scapyNames = scapyNames if scapyNames else main.params[ 'SCAPY' ][ 'HOSTNAMES' ].split( ',' )
main.scapyHosts = [] if not hasattr( main, "scapyHosts" ) else main.scapyHosts
for scapyName in main.scapyNames:
if hasattr( main, 'Mininet1' ):
main.Scapy.createHostComponent( scapyName )
scapyHandle = getattr( main, scapyName )
if mininetNames:
mininetName = mininetNames[ scapyNames.index( scapyName ) ]
mininetName = None
if 'MN_DOCKER' in main.params and main.params['MN_DOCKER']['args']:
scapyHandle.mExecDir = "/tmp"
scapyHandle.hostHome = main.params[ "MN_DOCKER" ][ "home" ]
main.log.debug( "start mn host component in docker" )
scapyHandle.startHostCli( mininetName,
hostHome=main.params[ "MN_DOCKER" ][ "home" ] )
main.log.debug( "start mn host component" )
scapyHandle.startHostCli( mininetName )
main.Network.createHostComponent( scapyName )
scapyHandle = getattr( main, scapyName )
main.scapyHosts.append( scapyHandle )
main.log.debug( )
main.log.debug( scapyHandle.hostIp )
main.log.debug( scapyHandle.hostMac )
def verifyTraffic( main, srcHosts, dstIp, dstHost, dstIntf, ipv6=False, expect=True, skipOnFail=True, maxRetry=2 ):
Verify unicast traffic by pinging from source hosts to the destination IP
and capturing the packets at the destination host using Scapy.
srcHosts: List of host names to send the ping packets
dstIp: destination IP of the ping packets
dstHost: host that runs Scapy to capture the packets
dstIntf: name of the interface on the destination host
expect: use True if the ping is expected to be captured at destination;
Otherwise False
skipOnFail: skip the rest of this test case if result is not expected
maxRetry: number of retries allowed
from tests.dependencies.topology import Topology
except ( NameError, AttributeError ):
main.topo = Topology()
main.step( "Verify traffic to {} by capturing packets on {}".format( dstIp, dstHost ) )
result = main.TRUE
for srcHost in srcHosts:
trafficResult = main.topo.pingAndCapture( srcHost, dstIp, dstHost, dstIntf, ipv6,
expect, maxRetry, True )
if not trafficResult:
result = main.FALSE
main.log.warn( "Scapy result from {} to {} is not as expected".format( srcHost, dstIp ) )
utilities.assert_equals( expect=main.TRUE,
onpass="Verify traffic to {}: Pass".format( dstIp ),
onfail="Verify traffic to {}: Fail".format( dstIp ) )
if skipOnFail and result != main.TRUE:
Testcaselib.saveOnosDiagnostics( main )
Testcaselib.cleanup( main, copyKarafLog=False )
def verifyMulticastTraffic( main, routeName, expect, skipOnFail=True, maxRetry=1 ):
Verify multicast traffic using scapy
from tests.dependencies.topology import Topology
except ( NameError, AttributeError ):
main.topo = Topology()
main.step( "Verify {} multicast traffic".format( routeName ) )
routeData = main.multicastConfig[ routeName ]
srcs = main.mcastRoutes[ routeName ][ "src" ]
dsts = main.mcastRoutes[ routeName ][ "dst" ] "Sending multicast traffic from {} to {}".format( [ routeData[ "src" ][ i ][ "host" ] for i in srcs ],
[ routeData[ "dst" ][ i ][ "host" ] for i in dsts ] ) )
result = main.TRUE
for src in srcs:
srcEntry = routeData[ "src" ][ src ]
for dst in dsts:
dstEntry = routeData[ "dst" ][ dst ]
sender = getattr( main, srcEntry[ "host" ] )
receiver = getattr( main, dstEntry[ "host" ] )
main.Network.addRoute( str( srcEntry[ "host" ] ),
str( routeData[ "group" ] ),
str( srcEntry[ "interface" ] ),
True if routeData[ "ipVersion" ] == 6 else False )
# Build the packet
sender.buildEther( dst=str( srcEntry[ "Ether" ] ) )
if routeData[ "ipVersion" ] == 4:
sender.buildIP( dst=str( routeData[ "group" ] ) )
elif routeData[ "ipVersion" ] == 6:
sender.buildIPv6( dst=str( routeData[ "group" ] ) )
sender.buildUDP( ipVersion=routeData[ "ipVersion" ], dport=srcEntry[ "UDP" ] )
sIface = srcEntry[ "interface" ]
dIface = dstEntry[ "interface" ] if "interface" in dstEntry.keys() else None
pktFilter = srcEntry[ "filter" ]
pkt = srcEntry[ "packet" ]
# Send packet and check received packet
expectedResult = expect.pop( 0 ) if isinstance( expect, list ) else expect
t3Cmd = "t3-troubleshoot -vv -sp {} -et ipv{} -d {} -dm {}".format( srcEntry[ "port" ], routeData[ "ipVersion" ],
routeData[ "group" ], srcEntry[ "Ether" ] )
trafficResult = main.topo.sendScapyPackets( sender, receiver, pktFilter, pkt, sIface, dIface,
expectedResult, maxRetry, True, t3Cmd )
if not trafficResult:
result = main.FALSE
main.log.warn( "Scapy result from {} to {} is not as expected".format( srcEntry[ "host" ],
dstEntry[ "host" ] ) )
utilities.assert_equals( expect=main.TRUE,
onpass="Verify {} multicast traffic: Pass".format( routeName ),
onfail="Verify {} multicast traffic: Fail".format( routeName ) )
if skipOnFail and result != main.TRUE:
Testcaselib.saveOnosDiagnostics( main )
Testcaselib.cleanup( main, copyKarafLog=False )
def verifyPing( main, srcList, dstList, ipv6=False, expect=True, wait=1,
acceptableFailed=0, skipOnFail=True, stepMsg="Verify Ping",
t3Simple=True ):
Verify reachability from each host in srcList to each host in dstList
from tests.dependencies.topology import Topology
except ( NameError, AttributeError ):
main.topo = Topology()
main.step( stepMsg )
pingResult = srcList, dstList, ipv6, expect, wait, acceptableFailed, skipOnFail, t3Simple )
utilities.assert_equals( expect=main.TRUE,
onpass="{}: Pass".format( stepMsg ),
onfail="{}: Fail".format( stepMsg ) )
if not pingResult and skipOnFail:
Testcaselib.saveOnosDiagnostics( main )
Testcaselib.cleanup( main, copyKarafLog=False, removeHostComponent=True )
def verifyHostLocations( main, locationDict, retry=2 ):
Verify if the specified host is discovered by ONOS on the given locations
locationDict: a dictionary that maps host names to expected locations.
locations could be a string or a list.
ex. { "h1v4": ["of:0000000000000005/8"] }
main.TRUE if host is discovered on all locations provided, otherwise main.FALSE
main.step( "Verify locations of hosts {}".format( locationDict.keys() ) )
result = main.TRUE
for hostName, locations in locationDict.items(): "Verify host {} is discovered at {}".format( hostName, locations ) )
hostIp = main.Network.getIPAddress( hostName, proto='IPV4' )
if not hostIp:
hostIp = main.Network.getIPAddress( hostName, proto='IPV6' )
if not hostIp:
main.log.warn( "Failed to find IP address for host {}, skipping location verification".format( hostName ) )
result = main.FALSE
locationResult = utilities.retry( 0 ).CLI.verifyHostLocation,
args=( hostIp, locations ),
attempts=retry + 1,
sleep=10 )
if not locationResult:
result = main.FALSE
main.log.warn( "location verification for host {} failed".format( hostName ) )
utilities.assert_equals( expect=main.TRUE, actual=result,
onpass="Location verification passed",
onfail="Location verification failed" )
def moveHost( main, hostName, srcSw, dstSw, gw, macAddr=None, prefixLen=None, cfg='', ipv6=False, vlan=None ):
Move specified host from srcSw to dstSw.
If srcSw and dstSw are same, the host will be moved from current port to
next available port.
hostName: name of the host. e.g., "h1"
srcSw: name of the switch that the host is attached to. e.g., "leaf1"
dstSw: name of the switch that the host will be moved to. e.g., "leaf2"
gw: ip address of the gateway of the new location
macAddr: if specified, change MAC address of the host to the specified MAC address.
prefixLen: prefix length
cfg: port configuration as JSON string
ipv6: Use True to move IPv6 host
vlan: vlan number of the host
if not hasattr( main, 'Mininet1' ):
main.log.warn( "moveHost is supposed to be used only in Mininet." )
main.step( "Moving {} host {} from {} to {}".format( 'tagged' if vlan else 'untagged', hostName, srcSw, dstSw ) )
main.Mininet1.moveHost( hostName, srcSw, dstSw, macAddr, prefixLen, ipv6, vlan=vlan )
if not ipv6:
main.Mininet1.changeDefaultGateway( hostName, gw )
if cfg: 0 ).REST.setNetCfg( json.loads( cfg ),
subjectClass="ports" )
# Wait for the host to get RA for setting up default gateway
main.log.debug( "sleeping %i seconds" % ( 5 ) )
time.sleep( 5 )
main.Mininet1.discoverHosts( [ hostName, ] )
# Update expectedHost when MAC address is changed.
if macAddr is not None:
ipAddr = main.expectedHosts[ "network" ][ hostName ]
if ipAddr is not None:
for hostName, ip in main.expectedHosts[ "onos" ].items():
if ip == ipAddr:
vlan = hostName.split( "/" )[ -1 ]
del main.expectedHosts[ "onos" ][ hostName ]
main.expectedHosts[ "onos" ][ "{}/{}".format( macAddr.upper(), vlan ) ] = ip
def moveDualHomedHost( main, hostName, srcSw, srcPairSw, dstSw, dstPairSw, gw,
macAddr=None, prefixLen=24, cfg='', ipv6=False, vlan=None ):
Move specified dual-homed host from srcSw-srcPairSw to dstSw-dstPairSw.
If srcSw-srcPairSw and dstSw-dstPairSw are same, the host will be moved from current port
to next available port.
hostName: name of the host. e.g., "h1"
srcSw: name of the switch that the host is attached to. e.g., "leaf1"
srcPairSw: name of the paired-switch that the host is attached to. e.g., "leaf2"
dstSw: name of the switch that the host will be moved to. e.g., "leaf1"
dstPairSw: name of the paired-switch that the host will be moved to. e.g., "leaf2"
gw: ip address of the gateway of the new location
macAddr: if specified, change MAC address of the host to the specified MAC address.
prefixLen: prefix length
cfg: port configurations as JSON string
ipv6: Use True to move IPv6 host
vlan: vlan number of the host
if not hasattr( main, 'Mininet1' ):
main.log.warn( "moveDualHomedHost is supposed to be used only in Mininet." )
main.step( "Moving {} host {} from {} and {} to {} and {}".format( 'tagged' if vlan else 'untagged', hostName,
srcSw, srcPairSw, dstSw, dstPairSw ) )
main.Mininet1.moveDualHomedHost( hostName, srcSw, srcPairSw, dstSw, dstPairSw,
macAddr=macAddr, prefixLen=prefixLen, ipv6=ipv6, vlan=vlan )
if not ipv6:
main.Mininet1.changeDefaultGateway( hostName, gw )
if cfg: 0 ).REST.setNetCfg( json.loads( cfg ),
subjectClass="ports" )
# Wait for the host to get RA for setting up default gateway
main.log.debug( "sleeping %i seconds" % ( 5 ) )
time.sleep( 5 )
main.Mininet1.discoverHosts( [ hostName, ] )
# Update expectedHost when MAC address is changed.
if macAddr is not None:
ipAddr = main.expectedHosts[ "network" ][ hostName ]
if ipAddr is not None:
for hostName, ip in main.expectedHosts[ "onos" ].items():
if ip == ipAddr:
vlan = hostName.split( "/" )[ -1 ]
del main.expectedHosts[ "onos" ][ hostName ]
main.expectedHosts[ "onos" ][ "{}/{}".format( macAddr.upper(), vlan ) ] = ip
def mnDockerSetup( main ):
Optionally start and setup docker image for mininet
if 'MN_DOCKER' in main.params and main.params['MN_DOCKER']['args']: "Creating Mininet Docker" )
handle = main.Mininet1.handle
main.Mininet1.dockerPrompt = '#'
# build docker image
buildOutput = ""
handle.sendline( " docker build -t trellis_mininet %s/../dependencies/" % main.testDir )
handle.expect( "Successfully built", timeout=600 )
buildOutput = handle.before + str( handle.after )
handle.expect( main.Mininet1.prompt )
buildOutput += handle.before
main.log.debug( buildOutput )
except pexpect.TIMEOUT as e:
main.log.error( e )
buildOutput += handle.before
main.log.debug( buildOutput )
confDir = "/tmp/mn_conf/"
# Try to ensure the destination exists "Create folder for network config files" )
handle.sendline( "mkdir -p %s" % confDir )
handle.expect( main.Mininet1.prompt )
main.log.debug( handle.before + handle.after )
# Make sure permissions are correct
handle.sendline( "sudo chown %s:%s %s" % ( main.Mininet1.user_name, main.Mininet1.user_name, confDir ) )
handle.expect( main.Mininet1.prompt )
handle.sendline( "sudo chmod -R a+rwx %s" % ( confDir ) )
handle.expect( main.Mininet1.prompt )
main.log.debug( handle.before + handle.after )
# Start docker container
handle.sendline( "docker run --name trellis_mininet %s %s" % ( main.params[ 'MN_DOCKER' ][ 'args' ], main.params[ 'MN_DOCKER' ][ 'name' ] ) )
i = handle.expect( [ main.Mininet1.bashPrompt, "Error response from daemon: Conflict. The container name" ] )
output = handle.before + handle.after
main.log.debug( repr(output) )
if i == 1:
main.log.error( "Docker container already running, aborting test" )
handle.sendline( "docker attach trellis_mininet" )
handle.expect( main.Mininet1.dockerPrompt )
main.log.debug( handle.before + handle.after )
handle.sendline( "sysctl -w net.ipv4.ip_forward=0" )
handle.sendline( "sysctl -w net.ipv4.conf.all.forwarding=0" )
handle.expect( main.Mininet1.dockerPrompt )
main.log.debug( handle.before + handle.after )
# We should be good to go
main.Mininet1.prompt = main.Mininet1.dockerPrompt
main.Mininet1.sudoRequired = False
# Fow when we create component handles
main.Mininet1.mExecDir = "/tmp"
main.Mininet1.hostHome = main.params[ "MN_DOCKER" ][ "home" ]
main.Mininet1.hostPrompt = "/home/root#"
def mnDockerTeardown( main ):
Optionally stop and cleanup docker image for mininet
if hasattr( main, 'Mininet1' ):
if 'MN_DOCKER' in main.params and main.params['MN_DOCKER']['args']: "Deleting Mininet Docker" )
# Detach from container
handle = main.Mininet1.handle
handle.sendline( "exit" ) # ctrl-p ctrk-q to detach from container
main.log.debug( "sleeping %i seconds" % ( 5 ) )
handle.expect( main.Mininet1.dockerPrompt )
main.log.debug( handle.before + handle.after )
main.Mininet1.prompt = main.Mininet1.bashPrompt
handle.expect( main.Mininet1.prompt )
main.log.debug( handle.before + handle.after )
main.Mininet1.sudoRequired = True
except Exception as e:
main.log.error( e )
def setOnosConfig( main ):
Read and Set onos configurations from the params file
main.step( "Set ONOS configurations" )
config = main.params.get( 'ONOS_Configuration' )
if config:
main.log.debug( config )
checkResult = main.TRUE
for component in config:
for setting in config[ component ]:
value = config[ component ][ setting ]
check = component, setting, value ) "Value was changed? {}".format( main.TRUE == check ) )
checkResult = check and checkResult
utilities.assert_equals( expect=main.TRUE,
onpass="Successfully set config",
onfail="Failed to set config" )
main.log.warn( "No configurations were specified to be changed after startup" )
def setOnosLogLevels( main ):
Read and Set onos log levels from the params file
main.step( 'Set logging levels' )
logging = True
logs = main.params.get( 'ONOS_Logging', False )
if logs:
for namespace, level in logs.items():
for ctrl in
ctrl.CLI.logSet( level, namespace )
except AttributeError:
logging = False
utilities.assert_equals( expect=True, actual=logging,
onpass="Set log levels",
onfail="Failed to set log levels" )