Support key/value sizes and counts in primitive performance app.
Change-Id: I5c2e00b4b2c30faad23a45adc6b7582fa150b6d1
diff --git a/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
index 256edfc..4a36ebe 100644
--- a/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
+++ b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
@@ -15,18 +15,18 @@
*/
package org.onosproject.primitiveperf;
+import java.util.ArrayList;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
@@ -35,15 +35,14 @@
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
@@ -66,23 +65,68 @@
private final Logger log = getLogger(getClass());
- private static final int DEFAULT_NUM_CLIENTS = 64;
+ private static final int DEFAULT_NUM_CLIENTS = 8;
private static final int DEFAULT_WRITE_PERCENTAGE = 100;
+ private static final int DEFAULT_NUM_KEYS = 100_000;
+ private static final int DEFAULT_KEY_LENGTH = 32;
+ private static final int DEFAULT_NUM_UNIQUE_VALUES = 100;
+ private static final int DEFAULT_VALUE_LENGTH = 1024;
+ private static final boolean DEFAULT_INCLUDE_EVENTS = false;
+ private static final boolean DEFAULT_DETERMINISTIC = true;
+
private static final int REPORT_PERIOD = 1_000; //ms
- private static final String START = "start";
- private static final String STOP = "stop";
- private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
+ private static final char[] CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".toCharArray();
- @Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
- label = "Number of clients to use to submit writes")
+ @Property(
+ name = "numClients",
+ intValue = DEFAULT_NUM_CLIENTS,
+ label = "Number of clients to use to submit writes")
private int numClients = DEFAULT_NUM_CLIENTS;
- @Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
- label = "Percentage of operations to perform as writes")
+ @Property(
+ name = "writePercentage",
+ intValue = DEFAULT_WRITE_PERCENTAGE,
+ label = "Percentage of operations to perform as writes")
private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
+ @Property(
+ name = "numKeys",
+ intValue = DEFAULT_NUM_KEYS,
+ label = "Number of unique keys to write")
+ private int numKeys = DEFAULT_NUM_KEYS;
+
+ @Property(
+ name = "keyLength",
+ intValue = DEFAULT_KEY_LENGTH,
+ label = "Key length")
+ private int keyLength = DEFAULT_KEY_LENGTH;
+
+ @Property(
+ name = "numValues",
+ intValue = DEFAULT_NUM_UNIQUE_VALUES,
+ label = "Number of unique values to write")
+ private int numValues = DEFAULT_NUM_UNIQUE_VALUES;
+
+ @Property(
+ name = "valueLength",
+ intValue = DEFAULT_VALUE_LENGTH,
+ label = "Value length")
+ private int valueLength = DEFAULT_VALUE_LENGTH;
+
+ @Property(
+ name = "includeEvents",
+ boolValue = DEFAULT_INCLUDE_EVENTS,
+ label = "Whether to include events in test")
+ private boolean includeEvents = DEFAULT_INCLUDE_EVENTS;
+
+ @Property(
+ name = "deterministic",
+ boolValue = DEFAULT_DETERMINISTIC,
+ label = "Whether to deterministically populate entries")
+ private boolean deterministic = DEFAULT_DETERMINISTIC;
+
@Reference(cardinality = MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -95,13 +139,22 @@
@Reference(cardinality = MANDATORY_UNARY)
protected PrimitivePerfCollector sampleCollector;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService communicationService;
-
private ExecutorService messageHandlingExecutor;
private ExecutorService workers;
- private boolean stopped = true;
+
+ // Tracks whether the test has been started in the cluster.
+ private AtomicValue<Boolean> started;
+ private AtomicValueEventListener<Boolean> startedListener = event -> {
+ if (event.newValue()) {
+ startTestRun();
+ } else {
+ stopTestRun();
+ }
+ };
+
+ // Tracks whether local clients are running.
+ private volatile boolean running;
private Timer reportTimer;
@@ -118,15 +171,18 @@
configService.registerProperties(getClass());
nodeId = clusterService.getLocalNode().id();
+ started = storageService.<Boolean>atomicValueBuilder()
+ .withName("perf-test-started")
+ .withSerializer(Serializer.using(KryoNamespaces.BASIC))
+ .build()
+ .asAtomicValue();
+ started.addListener(startedListener);
reportTimer = new Timer("onos-primitive-perf-reporter");
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/perf", "command-handler"));
- communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
- messageHandlingExecutor);
-
// TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
modify(context);
}
@@ -137,7 +193,7 @@
configService.unregisterProperties(getClass(), false);
messageHandlingExecutor.shutdown();
- communicationService.removeSubscriber(CONTROL);
+ started.removeListener(startedListener);
if (reportTimer != null) {
reportTimer.cancel();
@@ -145,6 +201,16 @@
}
}
+ private int parseInt(Dictionary<?, ?> properties, String name, int currentValue, int defaultValue) {
+ try {
+ String s = get(properties, name);
+ return isNullOrEmpty(s) ? currentValue : Integer.parseInt(s.trim());
+ } catch (NumberFormatException | ClassCastException e) {
+ log.warn("Malformed configuration detected; using defaults", e);
+ return defaultValue;
+ }
+ }
+
@Modified
public void modify(ComponentContext context) {
if (context == null) {
@@ -153,55 +219,106 @@
}
Dictionary<?, ?> properties = context.getProperties();
- int newNumClients;
- try {
- String s = get(properties, "numClients");
- newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
- } catch (NumberFormatException | ClassCastException e) {
- log.warn("Malformed configuration detected; using defaults", e);
- newNumClients = DEFAULT_NUM_CLIENTS;
- }
+ int newNumClients = parseInt(properties, "numClients", numClients, DEFAULT_NUM_CLIENTS);
+ int newWritePercentage = parseInt(properties, "writePercentage", writePercentage, DEFAULT_WRITE_PERCENTAGE);
+ int newNumKeys = parseInt(properties, "numKeys", numKeys, DEFAULT_NUM_KEYS);
+ int newKeyLength = parseInt(properties, "keyLength", keyLength, DEFAULT_KEY_LENGTH);
+ int newNumValues = parseInt(properties, "numValues", numValues, DEFAULT_NUM_UNIQUE_VALUES);
+ int newValueLength = parseInt(properties, "valueLength", valueLength, DEFAULT_VALUE_LENGTH);
- int newWritePercentage;
- try {
- String s = get(properties, "writePercentage");
- newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
- } catch (NumberFormatException | ClassCastException e) {
- log.warn("Malformed configuration detected; using defaults", e);
- newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
- }
+ String includeEventsString = get(properties, "includeEvents");
+ boolean newIncludeEvents = isNullOrEmpty(includeEventsString)
+ ? includeEvents
+ : Boolean.parseBoolean(includeEventsString.trim());
- if (newNumClients != numClients || newWritePercentage != writePercentage) {
+ String deterministicString = get(properties, "deterministic");
+ boolean newDeterministic = isNullOrEmpty(deterministicString)
+ ? includeEvents
+ : Boolean.parseBoolean(deterministicString.trim());
+
+ if (newNumClients != numClients
+ || newWritePercentage != writePercentage
+ || newNumKeys != numKeys
+ || newKeyLength != keyLength
+ || newNumValues != numValues
+ || newValueLength != valueLength
+ || newIncludeEvents != includeEvents
+ || newDeterministic != deterministic) {
numClients = newNumClients;
writePercentage = newWritePercentage;
+ numKeys = newNumKeys;
+ keyLength = newKeyLength;
+ numValues = newNumValues;
+ valueLength = newValueLength;
+ includeEvents = newIncludeEvents;
+ deterministic = newDeterministic;
logConfig("Reconfigured");
- if (!stopped) {
+ Boolean started = this.started.get();
+ if (started != null && started) {
stop();
start();
}
+ } else {
+ Boolean started = this.started.get();
+ if (started != null && started) {
+ if (running) {
+ stopTestRun();
+ }
+ startTestRun();
+ }
}
}
+ /**
+ * Starts a new test.
+ */
public void start() {
- if (stopped) {
- stopped = false;
- communicationService.broadcast(START, CONTROL, str -> str.getBytes());
- startTestRun();
+ // To stop the test, we simply update the "started" value. Events from the change will notify all
+ // nodes to start the test.
+ Boolean started = this.started.get();
+ if (started == null || !started) {
+ this.started.set(true);
}
}
+ /**
+ * Stops a test.
+ */
public void stop() {
- if (!stopped) {
- communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
- stopTestRun();
+ // To stop the test, we simply update the "started" value. Events from the change will notify all
+ // nodes to stop the test.
+ Boolean started = this.started.get();
+ if (started != null && started) {
+ this.started.set(false);
}
}
private void logConfig(String prefix) {
- log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
+ log.info("{} with " +
+ "numClients = {}; " +
+ "writePercentage = {}; " +
+ "numKeys = {}; " +
+ "keyLength = {}; " +
+ "numValues = {}; " +
+ "valueLength = {}; " +
+ "includeEvents = {}; " +
+ "deterministic = {}",
+ prefix,
+ numClients,
+ writePercentage,
+ numKeys,
+ keyLength,
+ numValues,
+ valueLength,
+ includeEvents,
+ deterministic);
}
private void startTestRun() {
+ if (running) {
+ return;
+ }
+
sampleCollector.clearSamples();
startTime = System.currentTimeMillis();
@@ -210,11 +327,10 @@
overallCounter = new AtomicLong();
reporterTask = new ReporterTask();
- reportTimer.scheduleAtFixedRate(reporterTask,
- REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
- REPORT_PERIOD);
+ reportTimer.scheduleAtFixedRate(
+ reporterTask, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
- stopped = false;
+ running = true;
Map<String, ControllerNode> nodes = new TreeMap<>();
for (ControllerNode node : clusterService.getNodes()) {
@@ -236,15 +352,57 @@
// Create a worker pool and start the workers for this node.
if (workerCount > 0) {
+ String[] keys = createStrings(keyLength, numKeys);
+ String[] values = createStrings(valueLength, numValues);
+
workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
for (int i = 0; i < workerCount; i++) {
- workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ if (deterministic) {
+ workers.submit(new DeterministicRunner(keys, values));
+ } else {
+ workers.submit(new NonDeterministicRunner(keys, values));
+ }
}
}
log.info("Started test run");
}
+ /**
+ * Creates a deterministic array of strings to write to the cluster.
+ *
+ * @param length the string lengths
+ * @param count the string count
+ * @return a deterministic array of strings
+ */
+ private String[] createStrings(int length, int count) {
+ Random random = new Random(length);
+ List<String> stringsList = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ stringsList.add(randomString(length, random));
+ }
+ return stringsList.toArray(new String[stringsList.size()]);
+ }
+
+ /**
+ * Creates a deterministic string based on the given seed.
+ *
+ * @param length the seed from which to create the string
+ * @param random the random object from which to create the string characters
+ * @return the string
+ */
+ private String randomString(int length, Random random) {
+ char[] buffer = new char[length];
+ for (int i = 0; i < length; i++) {
+ buffer[i] = CHARS[random.nextInt(CHARS.length)];
+ }
+ return new String(buffer);
+ }
+
private void stopTestRun() {
+ if (!running) {
+ return;
+ }
+
if (reporterTask != null) {
reporterTask.cancel();
reporterTask = null;
@@ -262,26 +420,28 @@
sampleCollector.recordSample(0, 0);
sampleCollector.recordSample(0, 0);
- stopped = true;
+
+ running = false;
log.info("Stopped test run");
}
// Submits primitive operations.
- final class Runner implements Runnable {
- private final String key;
- private final String value;
- private ConsistentMap<String, String> map;
+ abstract class Runner implements Runnable {
+ final String[] keys;
+ final String[] values;
+ final Random random = new Random();
+ ConsistentMap<String, String> map;
- private Runner(String key, String value) {
- this.key = key;
- this.value = value;
+ Runner(String[] keys, String[] values) {
+ this.keys = keys;
+ this.values = values;
}
@Override
public void run() {
setup();
- while (!stopped) {
+ while (running) {
try {
submit();
} catch (Exception e) {
@@ -291,34 +451,55 @@
teardown();
}
- private void setup() {
+ void setup() {
map = storageService.<String, String>consistentMapBuilder()
.withName("perf-test")
.withSerializer(Serializer.using(KryoNamespaces.BASIC))
.build();
+ if (includeEvents) {
+ map.addListener(event -> {
+ });
+ }
}
- private void submit() {
- if (currentCounter.incrementAndGet() % 100 < writePercentage) {
- map.put(key, value);
+ abstract void submit();
+
+ void teardown() {
+ //map.destroy();
+ }
+ }
+
+ private class NonDeterministicRunner extends Runner {
+ NonDeterministicRunner(String[] keys, String[] values) {
+ super(keys, values);
+ }
+
+ @Override
+ void submit() {
+ currentCounter.incrementAndGet();
+ String key = keys[random.nextInt(keys.length)];
+ if (random.nextInt(100) < writePercentage) {
+ map.put(key, values[random.nextInt(values.length)]);
} else {
map.get(key);
}
}
-
- private void teardown() {
- map.destroy();
- }
}
- private class InternalControl implements Consumer<String> {
+ private class DeterministicRunner extends Runner {
+ private int index;
+
+ DeterministicRunner(String[] keys, String[] values) {
+ super(keys, values);
+ }
+
@Override
- public void accept(String cmd) {
- log.info("Received command {}", cmd);
- if (cmd.equals(START)) {
- startTestRun();
+ void submit() {
+ currentCounter.incrementAndGet();
+ if (random.nextInt(100) < writePercentage) {
+ map.put(keys[index++ % keys.length], values[random.nextInt(values.length)]);
} else {
- stopTestRun();
+ map.get(keys[random.nextInt(keys.length)]);
}
}
}