add multi-threaded REST load generator
Change-Id: Ia73fe2ba3eaf50846041f73038460270c9802f4e
diff --git a/TestON/tests/COMPflow/COMPflow.py b/TestON/tests/COMPflow/COMPflow.py
index 39246d6..adc86c9 100644
--- a/TestON/tests/COMPflow/COMPflow.py
+++ b/TestON/tests/COMPflow/COMPflow.py
@@ -256,12 +256,11 @@
main.case( "Create a json object for the batched flows" )
- main.step( "Parse batch information" )
+ main.step( "Parse batch creation information" )
main.batchSize = int(main.params['CASE1000']['batchSize'])
main.log.info("Number of flows in a batch is:" + str(main.batchSize))
main.flowJsonBatchList = []
- main.addedBatchList = []
postTimes = []
startSw = 1
@@ -284,10 +283,21 @@
main.flowJsonBatchList.append(flowJsonBatch)
startSw += 1
+ main.log.info( "Number of items created in the batch list is: " + str(len(main.flowJsonBatchList)))
+ def CASE2000(self, main):
+ '''
+ Args:
+ main:
+ Returns:
+
+ '''
+ main.case("Using REST API /flows/{} to post flow batch")
main.step("Using REST API /flows/{} to post flow batch")
+
+ main.addedBatchList = []
tStartPost = time.time()
for item in main.flowJsonBatchList:
ts = time.time()
@@ -319,7 +329,131 @@
str(duration))
main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
- def CASE2000(self, main):
+ def CASE2100(self, main):
+ '''
+ Posting flow batches using threads
+ '''
+ main.case("Using REST API /flows/{} to post flow batch - multi-threads")
+ main.step("Using REST API /flows/{} to post flow batch - multi-threads")
+
+ from Queue import Queue
+ from threading import Thread
+ import time
+
+ main.threadID = 0
+ main.addedBatchList = []
+ q = Queue()
+ tAllAdded = 0
+
+ def postWorker(id):
+ while True:
+ item = q.get()
+ status,response = main.ONOSrest.sendFlowBatch(batch = item)
+ main.log.info("Thread {} is working on posting. ".format(id))
+ main.addedBatchList.append(response[1])
+ q.task_done()
+
+ for i in range( int( main.params['CASE2100']['numThreads'])):
+ threadID = "ThreadID-" + str(i)
+ t = Thread(target = postWorker, name = threadID, args=(threadID,) )
+ t.daemon = True
+ t.start()
+
+ tStartPost = time.time()
+ for item in main.flowJsonBatchList:
+ q.put(item)
+
+ q.join()
+ tLastPostEnd = time.time()
+
+ main.step("Check to ensure all flows are in added state.")
+ #pprint(main.addedBatchList)
+ resp = main.FALSE
+ while resp != main.TRUE and ( tAllAdded - tLastPostEnd < int (main.params['CASE2100']['chkFlowTO']) ):
+ resp = main.ONOSrest.checkFlowsState()
+ time.sleep( float(main.params['SLEEP']['chkFlow']) )
+ tAllAdded = time.time()
+
+ if tAllAdded - tLastPostEnd >= int (main.params['CASE2100']['chkFlowTO']):
+ main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+ main.numFlows = int(main.params['CASE1000']['batches']) *\
+ int(main.params['CASE1000']['batchSize'])
+ main.log.info("Total number of flows: " + str (main.numFlows) )
+ main.log.info("Sum of each POST elapse time: " + str(numpy.sum(postTimes)) )
+ main.log.info("Total POST elapse time: " + str(tLastPostEnd-tStartPost))
+ main.log.info("Rate of ADD Controller response: " + str(main.numFlows / (tLastPostEnd - tStartPost)))
+
+ duration = tAllAdded - tLastPostEnd
+ main.log.info("Elapse time from end of last REST POST to Flows in ADDED state: " +\
+ str(duration))
+ main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
+ main.log.info("Number of flow Batches in the addedBatchList is: " + str( len(main.addedBatchList)))
+
+
+ def CASE3100(self, main):
+ '''
+ DELETE flow batches using threads
+ '''
+ main.case("Using REST API /flows/{} to delete flow batch - multi-threads")
+ main.step("Using REST API /flows/{} to delete flow batch - multi-threads")
+
+ from Queue import Queue
+ from threading import Thread
+ import time
+ import json
+
+ main.threadID = 0
+ q = Queue()
+ tAllRemoved = 0
+
+ main.log.info("Number of flow batches at start of remove: " + str( len( main.addedBatchList)))
+ def removeWorker(id):
+ while True:
+ item = q.get()
+ response = main.ONOSrest.removeFlowBatch(batch = json.loads(item) )
+ main.log.info("Thread {} is working on deleting. ".format(id))
+ q.task_done()
+
+ for i in range( int( main.params['CASE2100']['numThreads'])):
+ threadID = "ThreadID-" + str(i)
+ t = Thread(target = removeWorker, name = threadID, args=(threadID,) )
+ t.daemon = True
+ t.start()
+
+ tStartDelete = time.time()
+ for item in main.addedBatchList:
+ q.put(item)
+
+ q.join()
+ tLastDeleteEnd = time.time()
+ main.log.info("Number of flow batches at end of remove: " + str( len( main.addedBatchList)))
+
+ main.step("Check to ensure all flows are in added state.")
+ #pprint(main.addedBatchList)
+ resp = main.FALSE
+ while resp != main.TRUE and ( tAllRemoved - tLastDeleteEnd < int (main.params['CASE3100']['chkFlowTO']) ):
+ resp = main.ONOSrest.checkFlowsState()
+ time.sleep( float(main.params['SLEEP']['chkFlow']) )
+ tAllRemoved = time.time()
+
+ if tLastDeleteEnd - tLastDeleteEnd >= int (main.params['CASE2100']['chkFlowTO']):
+ main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+ main.numFlows = int(main.params['CASE1000']['batches']) *\
+ int(main.params['CASE1000']['batchSize'])
+ main.log.info("Total number of flows: " + str (main.numFlows) )
+ main.log.info("Sum of each DELETE elapse time: " + str(numpy.sum(postTimes)) )
+ main.log.info("Total DELETE elapse time: " + str(tLastDeleteEnd-tStartDelete))
+ main.log.info("Rate of DELETE Controller response: " + str(main.numFlows / (tLastDeleteEnd-tStartDelete)))
+
+ duration = tAllRemoved - tLastDeleteEnd
+ main.log.info("Elapse time from end of last REST DELETE to Flows in REMOVED state: " +\
+ str(duration))
+ main.log.info("Rate of Confirmed Batch Flow REMOVED is (flows/sec): " + str( main.numFlows / duration))
+
+
+ def CASE3000(self, main):
import time
import numpy
import json