[ONOS-6841] Sustained primitive throughput tests

Change-Id: Ibdd05bd868a5d481b8967e57797d6106026ba1ac
diff --git a/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
new file mode 100644
index 0000000..7af34af
--- /dev/null
+++ b/apps/test/primitive-perf/src/main/java/org/onosproject/primitiveperf/PrimitivePerfApp.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.primitiveperf;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.System.currentTimeMillis;
+import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Application to test sustained primitive throughput.
+ */
+@Component(immediate = true)
+@Service(value = PrimitivePerfApp.class)
+public class PrimitivePerfApp {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final int DEFAULT_NUM_CLIENTS = 64;
+    private static final int DEFAULT_WRITE_PERCENTAGE = 100;
+
+    private static final int REPORT_PERIOD = 1_000; //ms
+
+    private static final String START = "start";
+    private static final String STOP = "stop";
+    private static final MessageSubject CONTROL = new MessageSubject("primitive-perf-ctl");
+
+    @Property(name = "numClients", intValue = DEFAULT_NUM_CLIENTS,
+            label = "Number of clients to use to submit writes")
+    private int numClients = DEFAULT_NUM_CLIENTS;
+
+    @Property(name = "writePercentage", intValue = DEFAULT_WRITE_PERCENTAGE,
+            label = "Percentage of operations to perform as writes")
+    private int writePercentage = DEFAULT_WRITE_PERCENTAGE;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ComponentConfigService configService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected PrimitivePerfCollector sampleCollector;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    private ExecutorService messageHandlingExecutor;
+
+    private ExecutorService workers;
+    private boolean stopped = true;
+
+    private Timer reportTimer;
+
+    private NodeId nodeId;
+    private TimerTask reporterTask;
+
+    private long startTime;
+    private long currentStartTime;
+    private AtomicLong overallCounter;
+    private AtomicLong currentCounter;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        configService.registerProperties(getClass());
+
+        nodeId = clusterService.getLocalNode().id();
+
+        reportTimer = new Timer("onos-primitive-perf-reporter");
+
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/perf", "command-handler"));
+
+        communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
+                                           messageHandlingExecutor);
+
+        // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
+        modify(context);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        stopTestRun();
+
+        configService.unregisterProperties(getClass(), false);
+        messageHandlingExecutor.shutdown();
+        communicationService.removeSubscriber(CONTROL);
+
+        if (reportTimer != null) {
+            reportTimer.cancel();
+            reportTimer = null;
+        }
+    }
+
+    @Modified
+    public void modify(ComponentContext context) {
+        if (context == null) {
+            logConfig("Reconfigured");
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+        int newNumClients;
+        try {
+            String s = get(properties, "numClients");
+            newNumClients = isNullOrEmpty(s) ? numClients : Integer.parseInt(s.trim());
+        } catch (NumberFormatException | ClassCastException e) {
+            log.warn("Malformed configuration detected; using defaults", e);
+            newNumClients = DEFAULT_NUM_CLIENTS;
+        }
+
+        int newWritePercentage;
+        try {
+            String s = get(properties, "writePercentage");
+            newWritePercentage = isNullOrEmpty(s) ? writePercentage : Integer.parseInt(s.trim());
+        } catch (NumberFormatException | ClassCastException e) {
+            log.warn("Malformed configuration detected; using defaults", e);
+            newWritePercentage = DEFAULT_WRITE_PERCENTAGE;
+        }
+
+        if (newNumClients != numClients || newWritePercentage != writePercentage) {
+            numClients = newNumClients;
+            writePercentage = newWritePercentage;
+            logConfig("Reconfigured");
+            if (!stopped) {
+                stop();
+                start();
+            }
+        }
+    }
+
+    public void start() {
+        if (stopped) {
+            stopped = false;
+            communicationService.broadcast(START, CONTROL, str -> str.getBytes());
+            startTestRun();
+        }
+    }
+
+    public void stop() {
+        if (!stopped) {
+            communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
+            stopTestRun();
+        }
+    }
+
+    private void logConfig(String prefix) {
+        log.info("{} with numClients = {}; writePercentage = {}", prefix, numClients, writePercentage);
+    }
+
+    private void startTestRun() {
+        sampleCollector.clearSamples();
+
+        startTime = System.currentTimeMillis();
+        currentStartTime = startTime;
+        currentCounter = new AtomicLong();
+        overallCounter = new AtomicLong();
+
+        reporterTask = new ReporterTask();
+        reportTimer.scheduleAtFixedRate(reporterTask,
+                                        REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
+                                        REPORT_PERIOD);
+
+        stopped = false;
+
+        Map<String, ControllerNode> nodes = new TreeMap<>();
+        for (ControllerNode node : clusterService.getNodes()) {
+            nodes.put(node.id().id(), node);
+        }
+
+        // Compute the index of the local node in a sorted list of nodes.
+        List<String> sortedNodes = Lists.newArrayList(nodes.keySet());
+        int nodeCount = nodes.size();
+        int index = sortedNodes.indexOf(nodeId.id());
+
+        // Count the number of workers assigned to this node.
+        int workerCount = 0;
+        for (int i = 1; i <= numClients; i++) {
+            if (i % nodeCount == index) {
+                workerCount++;
+            }
+        }
+
+        // Create a worker pool and start the workers for this node.
+        workers = Executors.newFixedThreadPool(workerCount, groupedThreads("onos/primitive-perf", "worker-%d"));
+        for (int i = 0; i < workerCount; i++) {
+            workers.submit(new Runner(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+        log.info("Started test run");
+    }
+
+    private void stopTestRun() {
+        if (reporterTask != null) {
+            reporterTask.cancel();
+            reporterTask = null;
+        }
+
+        try {
+            workers.awaitTermination(10, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Failed to stop worker", e);
+        }
+
+        sampleCollector.recordSample(0, 0);
+        sampleCollector.recordSample(0, 0);
+        stopped = true;
+
+        log.info("Stopped test run");
+    }
+
+    // Submits primitive operations.
+    final class Runner implements Runnable {
+        private final String key;
+        private final String value;
+        private ConsistentMap<String, String> map;
+
+        private Runner(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public void run() {
+            setup();
+            while (!stopped) {
+                try {
+                    submit();
+                } catch (Exception e) {
+                    log.warn("Exception during cycle", e);
+                }
+            }
+            teardown();
+        }
+
+        private void setup() {
+            map = storageService.<String, String>consistentMapBuilder()
+                    .withName("perf-test")
+                    .withSerializer(Serializer.using(KryoNamespaces.BASIC))
+                    .build();
+        }
+
+        private void submit() {
+            if (currentCounter.incrementAndGet() % 100 < writePercentage) {
+                map.put(key, value);
+            } else {
+                map.get(key);
+            }
+        }
+
+        private void teardown() {
+            map.destroy();
+        }
+    }
+
+    private class InternalControl implements Consumer<String> {
+        @Override
+        public void accept(String cmd) {
+            log.info("Received command {}", cmd);
+            if (cmd.equals(START)) {
+                startTestRun();
+            } else {
+                stopTestRun();
+            }
+        }
+    }
+
+    private class ReporterTask extends TimerTask {
+        @Override
+        public void run() {
+            long endTime = System.currentTimeMillis();
+            long overallTime = endTime - startTime;
+            long currentTime = endTime - currentStartTime;
+            long currentCount = currentCounter.getAndSet(0);
+            long overallCount = overallCounter.addAndGet(currentCount);
+            sampleCollector.recordSample(overallTime > 0 ? overallCount / (overallTime / 1000d) : 0,
+                    currentTime > 0 ? currentCount / (currentTime / 1000d) : 0);
+            currentStartTime = System.currentTimeMillis();
+        }
+    }
+
+}