[ONOS-6841] Sustained primitive throughput tests
Change-Id: Ibdd05bd868a5d481b8967e57797d6106026ba1ac
diff --git a/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
new file mode 100644
index 0000000..7af34af
--- /dev/null
+++ b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.primitiveperf;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.System.currentTimeMillis;
+import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Application to test sustained primitive throughput.
+ */
+@Component(immediate = true)
+@Service(value = PrimitivePerfApp.class)
+public class PrimitivePerfApp {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final int DEFAULT_NUM_CLIENTS = 64;
+ private static final int DEFAULT_WRITE_PERCENTAGE = 100;
+
+ private static final int REPORT_PERIOD = 1_000; //ms
+
+ private static final String START = "start";
+ private static final String STOP = "stop";
+ private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
+
+ @Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
+ label = "Number of clients to use to submit writes")
+ private int numClients = DEFAULT_NUM_CLIENTS;
+
+ @Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
+ label = "Percentage of operations to perform as writes")
+ private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
+
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected ComponentConfigService configService;
+
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected PrimitivePerfCollector sampleCollector;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService communicationService;
+
+ private ExecutorService messageHandlingExecutor;
+
+ private ExecutorService workers;
+ private boolean stopped = true;
+
+ private Timer reportTimer;
+
+ private NodeId nodeId;
+ private TimerTask reporterTask;
+
+ private long startTime;
+ private long currentStartTime;
+ private AtomicLong overallCounter;
+ private AtomicLong currentCounter;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ configService.registerProperties(getClass());
+
+ nodeId = clusterService.getLocalNode().id();
+
+ reportTimer = new Timer("onos-primitive-perf-reporter");
+
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/perf", "command-handler"));
+
+ communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
+ messageHandlingExecutor);
+
+ // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
+ modify(context);
+ }
+
+ @Deactivate
+ public void deactivate() {
+ stopTestRun();
+
+ configService.unregisterProperties(getClass(), false);
+ messageHandlingExecutor.shutdown();
+ communicationService.removeSubscriber(CONTROL);
+
+ if (reportTimer != null) {
+ reportTimer.cancel();
+ reportTimer = null;
+ }
+ }
+
+ @Modified
+ public void modify(ComponentContext context) {
+ if (context == null) {
+ logConfig("Reconfigured");
+ return;
+ }
+
+ Dictionary<?, ?> properties = context.getProperties();
+ int newNumClients;
+ try {
+ String s = get(properties, "numClients");
+ newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
+ } catch (NumberFormatException | ClassCastException e) {
+ log.warn("Malformed configuration detected; using defaults", e);
+ newNumClients = DEFAULT_NUM_CLIENTS;
+ }
+
+ int newWritePercentage;
+ try {
+ String s = get(properties, "writePercentage");
+ newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
+ } catch (NumberFormatException | ClassCastException e) {
+ log.warn("Malformed configuration detected; using defaults", e);
+ newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
+ }
+
+ if (newNumClients != numClients || newWritePercentage != writePercentage) {
+ numClients = newNumClients;
+ writePercentage = newWritePercentage;
+ logConfig("Reconfigured");
+ if (!stopped) {
+ stop();
+ start();
+ }
+ }
+ }
+
+ public void start() {
+ if (stopped) {
+ stopped = false;
+ communicationService.broadcast(START, CONTROL, str -> str.getBytes());
+ startTestRun();
+ }
+ }
+
+ public void stop() {
+ if (!stopped) {
+ communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
+ stopTestRun();
+ }
+ }
+
+ private void logConfig(String prefix) {
+ log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
+ }
+
+ private void startTestRun() {
+ sampleCollector.clearSamples();
+
+ startTime = System.currentTimeMillis();
+ currentStartTime = startTime;
+ currentCounter = new AtomicLong();
+ overallCounter = new AtomicLong();
+
+ reporterTask = new ReporterTask();
+ reportTimer.scheduleAtFixedRate(reporterTask,
+ REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
+ REPORT_PERIOD);
+
+ stopped = false;
+
+ Map<String, ControllerNode> nodes = new TreeMap<>();
+ for (ControllerNode node : clusterService.getNodes()) {
+ nodes.put(node.id().id(), node);
+ }
+
+ // Compute the index of the local node in a sorted list of nodes.
+ List<String> sortedNodes = Lists.newArrayList(nodes.keySet());
+ int nodeCount = nodes.size();
+ int index = sortedNodes.indexOf(nodeId.id());
+
+ // Count the number of workers assigned to this node.
+ int workerCount = 0;
+ for (int i = 1; i <= numClients; i++) {
+ if (i % nodeCount == index) {
+ workerCount++;
+ }
+ }
+
+ // Create a worker pool and start the workers for this node.
+ workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
+ for (int i = 0; i < workerCount; i++) {
+ workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ }
+ log.info("Started test run");
+ }
+
+ private void stopTestRun() {
+ if (reporterTask != null) {
+ reporterTask.cancel();
+ reporterTask = null;
+ }
+
+ try {
+ workers.awaitTermination(10, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Failed to stop worker", e);
+ }
+
+ sampleCollector.recordSample(0, 0);
+ sampleCollector.recordSample(0, 0);
+ stopped = true;
+
+ log.info("Stopped test run");
+ }
+
+ // Submits primitive operations.
+ final class Runner implements Runnable {
+ private final String key;
+ private final String value;
+ private ConsistentMap<String, String> map;
+
+ private Runner(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void run() {
+ setup();
+ while (!stopped) {
+ try {
+ submit();
+ } catch (Exception e) {
+ log.warn("Exception during cycle", e);
+ }
+ }
+ teardown();
+ }
+
+ private void setup() {
+ map = storageService.<String, String>consistentMapBuilder()
+ .withName("perf-test")
+ .withSerializer(Serializer.using(KryoNamespaces.BASIC))
+ .build();
+ }
+
+ private void submit() {
+ if (currentCounter.incrementAndGet() % 100 < writePercentage) {
+ map.put(key, value);
+ } else {
+ map.get(key);
+ }
+ }
+
+ private void teardown() {
+ map.destroy();
+ }
+ }
+
+ private class InternalControl implements Consumer<String> {
+ @Override
+ public void accept(String cmd) {
+ log.info("Received command {}", cmd);
+ if (cmd.equals(START)) {
+ startTestRun();
+ } else {
+ stopTestRun();
+ }
+ }
+ }
+
+ private class ReporterTask extends TimerTask {
+ @Override
+ public void run() {
+ long endTime = System.currentTimeMillis();
+ long overallTime = endTime - startTime;
+ long currentTime = endTime - currentStartTime;
+ long currentCount = currentCounter.getAndSet(0);
+ long overallCount = overallCounter.addAndGet(currentCount);
+ sampleCollector.recordSample(overallTime > 0 ? overallCount / (overallTime / 1000d) : 0,
+ currentTime > 0 ? currentCount / (currentTime / 1000d) : 0);
+ currentStartTime = System.currentTimeMillis();
+ }
+ }
+
+}