LoadTest update to use multiple counters in parallel for load generation
Change-Id: I7d7f13024372c8c998dc427cf30fdc2e2c68c5f9
diff --git a/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java b/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java
index b142282..48a51e5 100644
--- a/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java
+++ b/apps/test/loadtest/src/main/java/org/onosproject/loadtest/DistributedConsensusLoadTest.java
@@ -20,12 +20,17 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Dictionary;
-import java.util.concurrent.ExecutorService;
+import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +39,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -42,6 +48,7 @@
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
/**
@@ -69,24 +76,46 @@
protected CoreService coreService;
private static final int DEFAULT_RATE = 100;
+ private static final int TOTAL_COUNTERS = 50;
@Property(name = "rate", intValue = DEFAULT_RATE,
label = "Total number of increments per second to the atomic counter")
protected int rate = 0;
- private AtomicLong lastValue = new AtomicLong(0);
- private AtomicLong lastLoggedTime = new AtomicLong(0);
- private AsyncAtomicCounter counter;
- private ExecutorService testExecutor = Executors.newSingleThreadExecutor();
+ private final AtomicLong previousReportTime = new AtomicLong(0);
+ private final AtomicLong previousCount = new AtomicLong(0);
+ private final AtomicInteger increments = new AtomicInteger(0);
+ private final List<AsyncAtomicCounter> counters = Lists.newArrayList();
+ private final ScheduledExecutorService runner = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor();
@Activate
public void activate(ComponentContext context) {
configService.registerProperties(getClass());
appId = coreService.registerApplication("org.onosproject.loadtest");
log.info("Started with {}", appId);
- counter = storageService.atomicCounterBuilder()
- .withName("onos-app-loadtest-counter")
+ for (int i = 0; i < TOTAL_COUNTERS; ++i) {
+ AsyncAtomicCounter counter = storageService.atomicCounterBuilder()
+ .withName(String.format("onos-app-loadtest-counter-%d", i))
.build();
+ counters.add(counter);
+ }
+ reporter.scheduleWithFixedDelay(() -> {
+ Tools.allOf(counters.stream()
+ .map(AsyncAtomicCounter::get)
+ .collect(Collectors.toList()))
+ .whenComplete((r, e) -> {
+ if (e == null) {
+ long newCount = r.stream().reduce(Long::sum).get();
+ long currentTime = System.currentTimeMillis();
+ long delta = currentTime - previousReportTime.getAndSet(currentTime);
+ long rate = (newCount - previousCount.getAndSet(newCount)) * 1000 / delta;
+ log.info("{} updates per second", rate);
+ } else {
+ log.warn(e.getMessage());
+ }
+ });
+ }, 5, 5, TimeUnit.SECONDS);
modified(null);
}
@@ -97,16 +126,10 @@
while (!stopped.get()) {
limiter.acquire();
s.acquireUninterruptibly();
- counter.incrementAndGet().whenComplete((r, e) -> {
+ counters.get(RandomUtils.nextInt(TOTAL_COUNTERS)).incrementAndGet().whenComplete((r, e) -> {
s.release();
- long delta = System.currentTimeMillis() - lastLoggedTime.get();
if (e == null) {
- if (delta > 1000) {
- long tps = (long) ((r - lastValue.get()) * 1000.0) / delta;
- lastValue.set(r);
- lastLoggedTime.set(System.currentTimeMillis());
- log.info("Rate: {}", tps);
- }
+ increments.incrementAndGet();
}
});
}
@@ -120,7 +143,8 @@
public void deactivate(ComponentContext context) {
configService.unregisterProperties(getClass(), false);
stopTest();
- testExecutor.shutdown();
+ runner.shutdown();
+ reporter.shutdown();
log.info("Stopped");
}
@@ -138,10 +162,10 @@
}
}
if (newRate != rate) {
- log.info("Rate changed to {}", newRate);
+ log.info("Per node rate changed to {}", newRate);
rate = newRate;
stopTest();
- testExecutor.execute(this::startTest);
+ runner.execute(this::startTest);
}
}
}