Support key/value sizes and counts in primitive performance app.

Change-Id: I5c2e00b4b2c30faad23a45adc6b7582fa150b6d1
diff --git a/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
index 256edfc..4a36ebe 100644
--- a/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
+++ b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
@@ -15,18 +15,18 @@
  */
 package org.onosproject.primitiveperf;
 
+import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeMap;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
 
 import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Activate;
@@ -35,15 +35,14 @@
 import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.AtomicValueEventListener;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
@@ -66,23 +65,68 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final int DEFAULT_NUM_CLIENTS = 64;
+    private static final int DEFAULT_NUM_CLIENTS = 8;
     private static final int DEFAULT_WRITE_PERCENTAGE = 100;
 
+    private static final int DEFAULT_NUM_KEYS = 100_000;
+    private static final int DEFAULT_KEY_LENGTH = 32;
+    private static final int DEFAULT_NUM_UNIQUE_VALUES = 100;
+    private static final int DEFAULT_VALUE_LENGTH = 1024;
+    private static final boolean DEFAULT_INCLUDE_EVENTS = false;
+    private static final boolean DEFAULT_DETERMINISTIC = true;
+
     private static final int REPORT_PERIOD = 1_000; //ms
 
-    private static final String START = "start";
-    private static final String STOP = "stop";
-    private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
+    private static final char[] CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".toCharArray();
 
-    @Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
-            label = "Number of clients to use to submit writes")
+    @Property(
+        name = "numClients",
+        intValue = DEFAULT_NUM_CLIENTS,
+        label = "Number of clients to use to submit writes")
     private int numClients = DEFAULT_NUM_CLIENTS;
 
-    @Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
-            label = "Percentage of operations to perform as writes")
+    @Property(
+        name = "writePercentage",
+        intValue = DEFAULT_WRITE_PERCENTAGE,
+        label = "Percentage of operations to perform as writes")
     private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
 
+    @Property(
+        name = "numKeys",
+        intValue = DEFAULT_NUM_KEYS,
+        label = "Number of unique keys to write")
+    private int numKeys = DEFAULT_NUM_KEYS;
+
+    @Property(
+        name = "keyLength",
+        intValue = DEFAULT_KEY_LENGTH,
+        label = "Key length")
+    private int keyLength = DEFAULT_KEY_LENGTH;
+
+    @Property(
+        name = "numValues",
+        intValue = DEFAULT_NUM_UNIQUE_VALUES,
+        label = "Number of unique values to write")
+    private int numValues = DEFAULT_NUM_UNIQUE_VALUES;
+
+    @Property(
+        name = "valueLength",
+        intValue = DEFAULT_VALUE_LENGTH,
+        label = "Value length")
+    private int valueLength = DEFAULT_VALUE_LENGTH;
+
+    @Property(
+        name = "includeEvents",
+        boolValue = DEFAULT_INCLUDE_EVENTS,
+        label = "Whether to include events in test")
+    private boolean includeEvents = DEFAULT_INCLUDE_EVENTS;
+
+    @Property(
+        name = "deterministic",
+        boolValue = DEFAULT_DETERMINISTIC,
+        label = "Whether to deterministically populate entries")
+    private boolean deterministic = DEFAULT_DETERMINISTIC;
+
     @Reference(cardinality = MANDATORY_UNARY)
     protected ClusterService clusterService;
 
@@ -95,13 +139,22 @@
     @Reference(cardinality = MANDATORY_UNARY)
     protected PrimitivePerfCollector sampleCollector;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService communicationService;
-
     private ExecutorService messageHandlingExecutor;
 
     private ExecutorService workers;
-    private boolean stopped = true;
+
+    // Tracks whether the test has been started in the cluster.
+    private AtomicValue<Boolean> started;
+    private AtomicValueEventListener<Boolean> startedListener = event -> {
+        if (event.newValue()) {
+            startTestRun();
+        } else {
+            stopTestRun();
+        }
+    };
+
+    // Tracks whether local clients are running.
+    private volatile boolean running;
 
     private Timer reportTimer;
 
@@ -118,15 +171,18 @@
         configService.registerProperties(getClass());
 
         nodeId = clusterService.getLocalNode().id();
+        started = storageService.<Boolean>atomicValueBuilder()
+            .withName("perf-test-started")
+            .withSerializer(Serializer.using(KryoNamespaces.BASIC))
+            .build()
+            .asAtomicValue();
+        started.addListener(startedListener);
 
         reportTimer = new Timer("onos-primitive-perf-reporter");
 
         messageHandlingExecutor = Executors.newSingleThreadExecutor(
                 groupedThreads("onos/perf", "command-handler"));
 
-        communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
-                                           messageHandlingExecutor);
-
         // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
         modify(context);
     }
@@ -137,7 +193,7 @@
 
         configService.unregisterProperties(getClass(), false);
         messageHandlingExecutor.shutdown();
-        communicationService.removeSubscriber(CONTROL);
+        started.removeListener(startedListener);
 
         if (reportTimer != null) {
             reportTimer.cancel();
@@ -145,6 +201,16 @@
         }
     }
 
+    private int parseInt(Dictionary<?, ?> properties, String name, int currentValue, int defaultValue) {
+        try {
+            String s = get(properties, name);
+            return isNullOrEmpty(s) ? currentValue : Integer.parseInt(s.trim());
+        } catch (NumberFormatException | ClassCastException e) {
+            log.warn("Malformed configuration detected; using defaults", e);
+            return defaultValue;
+        }
+    }
+
     @Modified
     public void modify(ComponentContext context) {
         if (context == null) {
@@ -153,55 +219,106 @@
         }
 
         Dictionary<?, ?> properties = context.getProperties();
-        int newNumClients;
-        try {
-            String s = get(properties, "numClients");
-            newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
-        } catch (NumberFormatException | ClassCastException e) {
-            log.warn("Malformed configuration detected; using defaults", e);
-            newNumClients = DEFAULT_NUM_CLIENTS;
-        }
+        int newNumClients = parseInt(properties, "numClients", numClients, DEFAULT_NUM_CLIENTS);
+        int newWritePercentage = parseInt(properties, "writePercentage", writePercentage, DEFAULT_WRITE_PERCENTAGE);
+        int newNumKeys = parseInt(properties, "numKeys", numKeys, DEFAULT_NUM_KEYS);
+        int newKeyLength = parseInt(properties, "keyLength", keyLength, DEFAULT_KEY_LENGTH);
+        int newNumValues = parseInt(properties, "numValues", numValues, DEFAULT_NUM_UNIQUE_VALUES);
+        int newValueLength = parseInt(properties, "valueLength", valueLength, DEFAULT_VALUE_LENGTH);
 
-        int newWritePercentage;
-        try {
-            String s = get(properties, "writePercentage");
-            newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
-        } catch (NumberFormatException | ClassCastException e) {
-            log.warn("Malformed configuration detected; using defaults", e);
-            newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
-        }
+        String includeEventsString = get(properties, "includeEvents");
+        boolean newIncludeEvents = isNullOrEmpty(includeEventsString)
+            ? includeEvents
+            : Boolean.parseBoolean(includeEventsString.trim());
 
-        if (newNumClients != numClients || newWritePercentage != writePercentage) {
+        String deterministicString = get(properties, "deterministic");
+        boolean newDeterministic = isNullOrEmpty(deterministicString)
+            ? includeEvents
+            : Boolean.parseBoolean(deterministicString.trim());
+
+        if (newNumClients != numClients
+            || newWritePercentage != writePercentage
+            || newNumKeys != numKeys
+            || newKeyLength != keyLength
+            || newNumValues != numValues
+            || newValueLength != valueLength
+            || newIncludeEvents != includeEvents
+            || newDeterministic != deterministic) {
             numClients = newNumClients;
             writePercentage = newWritePercentage;
+            numKeys = newNumKeys;
+            keyLength = newKeyLength;
+            numValues = newNumValues;
+            valueLength = newValueLength;
+            includeEvents = newIncludeEvents;
+            deterministic = newDeterministic;
             logConfig("Reconfigured");
-            if (!stopped) {
+            Boolean started = this.started.get();
+            if (started != null && started) {
                 stop();
                 start();
             }
+        } else {
+            Boolean started = this.started.get();
+            if (started != null && started) {
+                if (running) {
+                    stopTestRun();
+                }
+                startTestRun();
+            }
         }
     }
 
+    /**
+     * Starts a new test.
+     */
     public void start() {
-        if (stopped) {
-            stopped = false;
-            communicationService.broadcast(START, CONTROL, str -> str.getBytes());
-            startTestRun();
+        // To stop the test, we simply update the "started" value. Events from the change will notify all
+        // nodes to start the test.
+        Boolean started = this.started.get();
+        if (started == null || !started) {
+            this.started.set(true);
         }
     }
 
+    /**
+     * Stops a test.
+     */
     public void stop() {
-        if (!stopped) {
-            communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
-            stopTestRun();
+        // To stop the test, we simply update the "started" value. Events from the change will notify all
+        // nodes to stop the test.
+        Boolean started = this.started.get();
+        if (started != null && started) {
+            this.started.set(false);
         }
     }
 
     private void logConfig(String prefix) {
-        log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
+        log.info("{} with " +
+            "numClients = {}; " +
+            "writePercentage = {}; " +
+            "numKeys = {}; " +
+            "keyLength = {}; " +
+            "numValues = {}; " +
+            "valueLength = {}; " +
+            "includeEvents = {}; " +
+            "deterministic = {}",
+            prefix,
+            numClients,
+            writePercentage,
+            numKeys,
+            keyLength,
+            numValues,
+            valueLength,
+            includeEvents,
+            deterministic);
     }
 
     private void startTestRun() {
+        if (running) {
+            return;
+        }
+
         sampleCollector.clearSamples();
 
         startTime = System.currentTimeMillis();
@@ -210,11 +327,10 @@
         overallCounter = new AtomicLong();
 
         reporterTask = new ReporterTask();
-        reportTimer.scheduleAtFixedRate(reporterTask,
-                                        REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
-                                        REPORT_PERIOD);
+        reportTimer.scheduleAtFixedRate(
+            reporterTask, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
 
-        stopped = false;
+        running = true;
 
         Map<String, ControllerNode> nodes = new TreeMap<>();
         for (ControllerNode node : clusterService.getNodes()) {
@@ -236,15 +352,57 @@
 
         // Create a worker pool and start the workers for this node.
         if (workerCount > 0) {
+            String[] keys = createStrings(keyLength, numKeys);
+            String[] values = createStrings(valueLength, numValues);
+
             workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
             for (int i = 0; i < workerCount; i++) {
-                workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+                if (deterministic) {
+                    workers.submit(new DeterministicRunner(keys, values));
+                } else {
+                    workers.submit(new NonDeterministicRunner(keys, values));
+                }
             }
         }
         log.info("Started test run");
     }
 
+    /**
+     * Creates a deterministic array of strings to write to the cluster.
+     *
+     * @param length the string lengths
+     * @param count the string count
+     * @return a deterministic array of strings
+     */
+    private String[] createStrings(int length, int count) {
+        Random random = new Random(length);
+        List<String> stringsList = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            stringsList.add(randomString(length, random));
+        }
+        return stringsList.toArray(new String[stringsList.size()]);
+    }
+
+    /**
+     * Creates a deterministic string based on the given seed.
+     *
+     * @param length the seed from which to create the string
+     * @param random the random object from which to create the string characters
+     * @return the string
+     */
+    private String randomString(int length, Random random) {
+        char[] buffer = new char[length];
+        for (int i = 0; i < length; i++) {
+            buffer[i] = CHARS[random.nextInt(CHARS.length)];
+        }
+        return new String(buffer);
+    }
+
     private void stopTestRun() {
+        if (!running) {
+            return;
+        }
+
         if (reporterTask != null) {
             reporterTask.cancel();
             reporterTask = null;
@@ -262,26 +420,28 @@
 
         sampleCollector.recordSample(0, 0);
         sampleCollector.recordSample(0, 0);
-        stopped = true;
+
+        running = false;
 
         log.info("Stopped test run");
     }
 
     // Submits primitive operations.
-    final class Runner implements Runnable {
-        private final String key;
-        private final String value;
-        private ConsistentMap<String, String> map;
+    abstract class Runner implements Runnable {
+        final String[] keys;
+        final String[] values;
+        final Random random = new Random();
+        ConsistentMap<String, String> map;
 
-        private Runner(String key, String value) {
-            this.key = key;
-            this.value = value;
+        Runner(String[] keys, String[] values) {
+            this.keys = keys;
+            this.values = values;
         }
 
         @Override
         public void run() {
             setup();
-            while (!stopped) {
+            while (running) {
                 try {
                     submit();
                 } catch (Exception e) {
@@ -291,34 +451,55 @@
             teardown();
         }
 
-        private void setup() {
+        void setup() {
             map = storageService.<String, String>consistentMapBuilder()
                     .withName("perf-test")
                     .withSerializer(Serializer.using(KryoNamespaces.BASIC))
                     .build();
+            if (includeEvents) {
+                map.addListener(event -> {
+                });
+            }
         }
 
-        private void submit() {
-            if (currentCounter.incrementAndGet() % 100 < writePercentage) {
-                map.put(key, value);
+        abstract void submit();
+
+        void teardown() {
+            //map.destroy();
+        }
+    }
+
+    private class NonDeterministicRunner extends Runner {
+        NonDeterministicRunner(String[] keys, String[] values) {
+            super(keys, values);
+        }
+
+        @Override
+        void submit() {
+            currentCounter.incrementAndGet();
+            String key = keys[random.nextInt(keys.length)];
+            if (random.nextInt(100) < writePercentage) {
+                map.put(key, values[random.nextInt(values.length)]);
             } else {
                 map.get(key);
             }
         }
-
-        private void teardown() {
-            map.destroy();
-        }
     }
 
-    private class InternalControl implements Consumer<String> {
+    private class DeterministicRunner extends Runner {
+        private int index;
+
+        DeterministicRunner(String[] keys, String[] values) {
+            super(keys, values);
+        }
+
         @Override
-        public void accept(String cmd) {
-            log.info("Received command {}", cmd);
-            if (cmd.equals(START)) {
-                startTestRun();
+        void submit() {
+            currentCounter.incrementAndGet();
+            if (random.nextInt(100) < writePercentage) {
+                map.put(keys[index++ % keys.length], values[random.nextInt(values.length)]);
             } else {
-                stopTestRun();
+                map.get(keys[random.nextInt(keys.length)]);
             }
         }
     }