add multi-threaded REST load generator

Change-Id: Ia73fe2ba3eaf50846041f73038460270c9802f4e
diff --git a/TestON/tests/COMPflow/COMPflow.py b/TestON/tests/COMPflow/COMPflow.py
index 39246d6..adc86c9 100644
--- a/TestON/tests/COMPflow/COMPflow.py
+++ b/TestON/tests/COMPflow/COMPflow.py
@@ -256,12 +256,11 @@
 
         main.case( "Create a json object for the batched flows" )
 
-        main.step( "Parse batch information" )
+        main.step( "Parse batch creation information" )
         main.batchSize = int(main.params['CASE1000']['batchSize'])
         main.log.info("Number of flows in a batch is:" + str(main.batchSize))
 
         main.flowJsonBatchList = []
-        main.addedBatchList = []
         postTimes = []
         startSw = 1
 
@@ -284,10 +283,21 @@
             main.flowJsonBatchList.append(flowJsonBatch)
 
             startSw += 1
+        main.log.info( "Number of items created in the batch list is: " + str(len(main.flowJsonBatchList)))
 
 
+    def CASE2000(self, main):
+        '''
+        Args:
+            main:
 
+        Returns:
+
+        '''
+        main.case("Using REST API /flows/{} to post flow batch")
         main.step("Using REST API /flows/{} to post flow batch")
+
+        main.addedBatchList = []
         tStartPost = time.time()
         for item in main.flowJsonBatchList:
             ts = time.time()
@@ -319,7 +329,131 @@
                       str(duration))
         main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
 
-    def CASE2000(self, main):
+    def CASE2100(self, main):
+        '''
+            Posting flow batches using threads
+        '''
+        main.case("Using REST API /flows/{} to post flow batch - multi-threads")
+        main.step("Using REST API /flows/{} to post flow batch - multi-threads")
+
+        from Queue import Queue
+        from threading import Thread
+        import time
+
+        main.threadID = 0
+        main.addedBatchList = []
+        q = Queue()
+        tAllAdded = 0
+
+        def postWorker(id):
+            while True:
+                item = q.get()
+                status,response = main.ONOSrest.sendFlowBatch(batch = item)
+                main.log.info("Thread {} is working on posting. ".format(id))
+                main.addedBatchList.append(response[1])
+                q.task_done()
+
+        for i in range( int( main.params['CASE2100']['numThreads'])):
+            threadID = "ThreadID-" + str(i)
+            t = Thread(target = postWorker, name = threadID, args=(threadID,) )
+            t.daemon = True
+            t.start()
+
+        tStartPost = time.time()
+        for item in main.flowJsonBatchList:
+            q.put(item)
+
+        q.join()
+        tLastPostEnd = time.time()
+
+        main.step("Check to ensure all flows are in added state.")
+        #pprint(main.addedBatchList)
+        resp = main.FALSE
+        while resp != main.TRUE and ( tAllAdded - tLastPostEnd < int (main.params['CASE2100']['chkFlowTO']) ):
+            resp = main.ONOSrest.checkFlowsState()
+            time.sleep( float(main.params['SLEEP']['chkFlow']) )
+            tAllAdded = time.time()
+
+        if tAllAdded - tLastPostEnd >= int (main.params['CASE2100']['chkFlowTO']):
+            main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+        main.numFlows = int(main.params['CASE1000']['batches']) *\
+                                                    int(main.params['CASE1000']['batchSize'])
+        main.log.info("Total number of flows: " + str (main.numFlows) )
+        main.log.info("Sum of each POST elapse time: " + str(numpy.sum(postTimes)) )
+        main.log.info("Total POST elapse time: " + str(tLastPostEnd-tStartPost))
+        main.log.info("Rate of ADD Controller response: " + str(main.numFlows / (tLastPostEnd - tStartPost)))
+
+        duration = tAllAdded - tLastPostEnd
+        main.log.info("Elapse time from end of last REST POST to Flows in ADDED state: " +\
+                      str(duration))
+        main.log.info("Rate of Confirmed Batch Flow ADD is (flows/sec): " + str( main.numFlows / duration))
+        main.log.info("Number of flow Batches in the addedBatchList is: " + str( len(main.addedBatchList)))
+
+
+    def CASE3100(self, main):
+        '''
+            DELETE flow batches using threads
+        '''
+        main.case("Using REST API /flows/{} to delete flow batch - multi-threads")
+        main.step("Using REST API /flows/{} to delete flow batch - multi-threads")
+
+        from Queue import Queue
+        from threading import Thread
+        import time
+        import json
+
+        main.threadID = 0
+        q = Queue()
+        tAllRemoved = 0
+
+        main.log.info("Number of flow batches at start of remove: " + str( len( main.addedBatchList)))
+        def removeWorker(id):
+            while True:
+                item = q.get()
+                response = main.ONOSrest.removeFlowBatch(batch = json.loads(item) )
+                main.log.info("Thread {} is working on deleting. ".format(id))
+                q.task_done()
+
+        for i in range( int( main.params['CASE2100']['numThreads'])):
+            threadID = "ThreadID-" + str(i)
+            t = Thread(target = removeWorker, name = threadID, args=(threadID,) )
+            t.daemon = True
+            t.start()
+
+        tStartDelete = time.time()
+        for item in main.addedBatchList:
+            q.put(item)
+
+        q.join()
+        tLastDeleteEnd = time.time()
+        main.log.info("Number of flow batches at end of remove: " + str( len( main.addedBatchList)))
+
+        main.step("Check to ensure all flows are in added state.")
+        #pprint(main.addedBatchList)
+        resp = main.FALSE
+        while resp != main.TRUE and ( tAllRemoved - tLastDeleteEnd < int (main.params['CASE3100']['chkFlowTO']) ):
+            resp = main.ONOSrest.checkFlowsState()
+            time.sleep( float(main.params['SLEEP']['chkFlow']) )
+            tAllRemoved = time.time()
+
+        if tLastDeleteEnd - tLastDeleteEnd >= int (main.params['CASE2100']['chkFlowTO']):
+            main.log.warn("ONOS Flows still in pending state after: {} seconds.".format(tAllAdded - tLastPostEnd))
+
+        main.numFlows = int(main.params['CASE1000']['batches']) *\
+                                                    int(main.params['CASE1000']['batchSize'])
+        main.log.info("Total number of flows: " + str (main.numFlows) )
+        main.log.info("Sum of each DELETE elapse time: " + str(numpy.sum(postTimes)) )
+        main.log.info("Total DELETE elapse time: " + str(tLastDeleteEnd-tStartDelete))
+        main.log.info("Rate of DELETE Controller response: " + str(main.numFlows / (tLastDeleteEnd-tStartDelete)))
+
+        duration = tAllRemoved - tLastDeleteEnd
+        main.log.info("Elapse time from end of last REST DELETE to Flows in REMOVED state: " +\
+                      str(duration))
+        main.log.info("Rate of Confirmed Batch Flow REMOVED is (flows/sec): " + str( main.numFlows / duration))
+
+
+    def CASE3000(self, main):
         import time
         import numpy
         import json