LoadTest update to use multiple counters in parallel for load generation

Change-Id: I7d7f13024372c8c998dc427cf30fdc2e2c68c5f9
diff --git a/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java b/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java
index b142282..48a51e5 100644
--- a/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java
+++ b/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java
@@ -20,12 +20,17 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Dictionary;
-import java.util.concurrent.ExecutorService;
+import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +39,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -42,6 +48,7 @@
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 
 /**
@@ -69,24 +76,46 @@
     protected CoreService coreService;
 
     private static final int DEFAULT_RATE = 100;
+    private static final int TOTAL_COUNTERS = 50;
 
     @Property(name = "rate", intValue = DEFAULT_RATE,
             label = "Total number of increments per second to the atomic counter")
     protected int rate = 0;
 
-    private AtomicLong lastValue = new AtomicLong(0);
-    private AtomicLong lastLoggedTime = new AtomicLong(0);
-    private AsyncAtomicCounter counter;
-    private ExecutorService testExecutor = Executors.newSingleThreadExecutor();
+    private final AtomicLong previousReportTime = new AtomicLong(0);
+    private final AtomicLong previousCount = new AtomicLong(0);
+    private final AtomicInteger increments = new AtomicInteger(0);
+    private final List<AsyncAtomicCounter> counters = Lists.newArrayList();
+    private final ScheduledExecutorService runner = Executors.newSingleThreadScheduledExecutor();
+    private final ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor();
 
     @Activate
     public void activate(ComponentContext context) {
         configService.registerProperties(getClass());
         appId = coreService.registerApplication("org.onosproject.loadtest");
         log.info("Started with {}", appId);
-        counter = storageService.atomicCounterBuilder()
-                                .withName("onos-app-loadtest-counter")
+        for (int i = 0; i < TOTAL_COUNTERS; ++i) {
+            AsyncAtomicCounter counter = storageService.atomicCounterBuilder()
+                                .withName(String.format("onos-app-loadtest-counter-%d", i))
                                 .build();
+            counters.add(counter);
+        }
+        reporter.scheduleWithFixedDelay(() -> {
+            Tools.allOf(counters.stream()
+                    .map(AsyncAtomicCounter::get)
+                    .collect(Collectors.toList()))
+                    .whenComplete((r, e) -> {
+                        if (e == null) {
+                            long newCount = r.stream().reduce(Long::sum).get();
+                            long currentTime = System.currentTimeMillis();
+                            long delta = currentTime - previousReportTime.getAndSet(currentTime);
+                            long rate = (newCount - previousCount.getAndSet(newCount)) * 1000 / delta;
+                            log.info("{} updates per second", rate);
+                        } else {
+                            log.warn(e.getMessage());
+                        }
+            });
+        }, 5, 5, TimeUnit.SECONDS);
         modified(null);
     }
 
@@ -97,16 +126,10 @@
         while (!stopped.get()) {
             limiter.acquire();
             s.acquireUninterruptibly();
-            counter.incrementAndGet().whenComplete((r, e) -> {
+            counters.get(RandomUtils.nextInt(TOTAL_COUNTERS)).incrementAndGet().whenComplete((r, e) -> {
                 s.release();
-                long delta = System.currentTimeMillis() - lastLoggedTime.get();
                 if (e == null) {
-                    if (delta > 1000) {
-                        long tps = (long) ((r - lastValue.get()) * 1000.0) / delta;
-                        lastValue.set(r);
-                        lastLoggedTime.set(System.currentTimeMillis());
-                        log.info("Rate: {}", tps);
-                    }
+                    increments.incrementAndGet();
                 }
             });
         }
@@ -120,7 +143,8 @@
     public void deactivate(ComponentContext context) {
         configService.unregisterProperties(getClass(), false);
         stopTest();
-        testExecutor.shutdown();
+        runner.shutdown();
+        reporter.shutdown();
         log.info("Stopped");
     }
 
@@ -138,10 +162,10 @@
             }
         }
         if (newRate != rate) {
-            log.info("Rate changed to {}", newRate);
+            log.info("Per node rate changed to {}", newRate);
             rate = newRate;
             stopTest();
-            testExecutor.execute(this::startTest);
+            runner.execute(this::startTest);
         }
     }
 }