Test application to measure partitioned raft database performance.

Change-Id: I6a1e773d71feb4867b959ec33841b8026dda32f5
diff --git a/apps/database-perf/main/java/org/onosproject/databaseperf/DatabasePerfInstaller.java b/apps/database-perf/main/java/org/onosproject/databaseperf/DatabasePerfInstaller.java
new file mode 100644
index 0000000..430211f
--- /dev/null
+++ b/apps/database-perf/main/java/org/onosproject/databaseperf/DatabasePerfInstaller.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.databaseperf;
+
+import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
+import static org.onlab.util.Tools.delay;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+/**
+ * Application to measure partitioned database performance.
+ */
+@Component(immediate = true)
+public class DatabasePerfInstaller {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private boolean stopped;
+
+    private ApplicationId appId;
+
+    private static final long REPORT_PERIOD = 5000L; //ms
+    private Timer reportTimer;
+
+    private AtomicInteger successCount = new AtomicInteger(0);
+    private AtomicInteger failureCount = new AtomicInteger(0);
+
+    private ConsistentMap<String, String> cmap;
+
+    private ControllerNode localNode;
+
+    private long reportStartTime = System.currentTimeMillis();
+
+    private static final int NUM_TASK_THREADS = 2;
+    private ExecutorService taskExecutor;
+
+    private static final Serializer SERIALIZER = new Serializer() {
+
+        KryoNamespace kryo = new KryoNamespace.Builder().build();
+
+        @Override
+        public <T> byte[] encode(T object) {
+            return kryo.serialize(object);
+        }
+
+        @Override
+        public <T> T decode(byte[] bytes) {
+            return kryo.deserialize(bytes);
+        }
+
+    };
+
+    @Activate
+    public void activate() {
+        localNode = clusterService.getLocalNode();
+        String nodeId = localNode.ip().toString();
+        appId = coreService.registerApplication("org.onosproject.nettyperf."
+                                                        + nodeId);
+
+        cmap = storageService.createConsistentMap("onos-app-database-perf-test-map", SERIALIZER);
+        taskExecutor = Executors.newFixedThreadPool(NUM_TASK_THREADS, groupedThreads("onos/database-perf", "worker"));
+        log.info("Started with Application ID {}", appId.id());
+        start();
+    }
+
+    @Deactivate
+    public void deactivate() {
+        stop();
+        log.info("Stopped");
+    }
+
+    public void start() {
+        long delay = System.currentTimeMillis() % REPORT_PERIOD;
+        reportTimer = new Timer("onos-netty-perf-reporter");
+        reportTimer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                report();
+            }
+        }, delay, REPORT_PERIOD);
+
+        stopped = false;
+        IntStream.range(0, NUM_TASK_THREADS).forEach(i -> {
+            taskExecutor.submit(() -> {
+                delay(2000); // take a breath to start
+                while (!stopped) {
+                    performDBOperation();
+                    delay(2); // take a breather
+                }
+            });
+        });
+    }
+
+    private void performDBOperation() {
+        String key = String.format("test%d", RandomUtils.nextInt(1000));
+        try {
+            if (RandomUtils.nextBoolean()) {
+                cmap.put(key, UUID.randomUUID().toString());
+            } else {
+                cmap.get(key);
+            }
+            successCount.incrementAndGet();
+        } catch (Exception e) {
+            failureCount.incrementAndGet();
+        }
+    }
+
+    private void report() {
+        long delta = System.currentTimeMillis() - reportStartTime;
+        if (delta > 0) {
+            int rate = (int) Math.round(((successCount.get() * 1000.0) / delta));
+            log.info("Passed: {}, Failed: {}, Rate: {}",
+                    successCount.getAndSet(0), failureCount.getAndSet(0), rate);
+            reportStartTime = System.currentTimeMillis();
+        }
+    }
+
+    public void stop() {
+        reportTimer.cancel();
+        reportTimer = null;
+        stopped = true;
+        try {
+            taskExecutor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Failed to stop worker.");
+        }
+    }
+}