ONOS-1621: Test app for measuring messaging layer performance

Change-Id: Idc2f2e70833bf30c05310f8f09c7daeb68149f98
diff --git a/apps/test/messaging-perf/pom.xml b/apps/test/messaging-perf/pom.xml
new file mode 100644
index 0000000..c68711d
--- /dev/null
+++ b/apps/test/messaging-perf/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2015 Open Networking Laboratory
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onosproject</groupId>
+        <artifactId>onos-apps-test</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>onos-app-messaging-perf</artifactId>
+    <packaging>bundle</packaging>
+
+    <description>Messaging performance test application</description>
+
+    <properties>
+        <onos.app.name>org.onosproject.messagingperf</onos.app.name>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
+        <!-- Required for javadoc generation -->
+        <dependency>
+           <groupId>org.osgi</groupId>
+           <artifactId>org.osgi.core</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/apps/test/messaging-perf/src/main/java/org/onosproject/messagingperf/MessagingPerfApp.java b/apps/test/messaging-perf/src/main/java/org/onosproject/messagingperf/MessagingPerfApp.java
new file mode 100644
index 0000000..2469793
--- /dev/null
+++ b/apps/test/messaging-perf/src/main/java/org/onosproject/messagingperf/MessagingPerfApp.java
@@ -0,0 +1,378 @@
+package org.onosproject.messagingperf;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.BoundedThreadPool;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Application for measuring cluster messaging performance.
+ */
+@Component(immediate = true, enabled = true)
+@Service(value = MessagingPerfApp.class)
+public class MessagingPerfApp {
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService configService;
+
+    private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
+            new MessageSubject("net-perf-unicast-message");
+
+    private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
+            new MessageSubject("net-perf-rr-message");
+
+    private static final int DEFAULT_SENDER_THREAD_POOL_SIZE = 2;
+    private static final int DEFAULT_RECEIVER_THREAD_POOL_SIZE = 2;
+
+    @Property(name = "totalSenderThreads", intValue = DEFAULT_SENDER_THREAD_POOL_SIZE,
+            label = "Number of sender threads")
+    protected int totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
+
+    @Property(name = "totalReceiverThreads", intValue = DEFAULT_RECEIVER_THREAD_POOL_SIZE,
+            label = "Number of receiver threads")
+    protected int totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
+
+    @Property(name = "serializationOn", boolValue = true,
+            label = "Turn serialization on/off")
+    private boolean serializationOn = true;
+
+    @Property(name = "receiveOnIOLoopThread", boolValue = false,
+            label = "Set this to true to handle message on IO thread")
+    private boolean receiveOnIOLoopThread = false;
+
+    protected int reportIntervalSeconds = 1;
+
+    private Executor messageReceivingExecutor;
+
+    private ExecutorService messageSendingExecutor =
+            BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
+                    groupedThreads("onos/messaging-perf-test", "sender-%d"));
+
+    private final ScheduledExecutorService reporter =
+            Executors.newSingleThreadScheduledExecutor(
+                    groupedThreads("onos/net-perf-test", "reporter"));
+
+    private AtomicInteger received = new AtomicInteger(0);
+    private AtomicInteger sent = new AtomicInteger(0);
+    private AtomicInteger attempted = new AtomicInteger(0);
+    private AtomicInteger completed = new AtomicInteger(0);
+
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.BASIC)
+                    .register(KryoNamespaces.MISC)
+                    .register(byte[].class)
+                    .register(Data.class)
+                    .build();
+        }
+    };
+
+    private final Data data = new Data().withStringField("test")
+                                .withListField(Lists.newArrayList("1", "2", "3"))
+                                .withSetField(Sets.newHashSet("1", "2", "3"));
+    private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
+            .withListField(Lists.newArrayList("1", "2", "3"))
+            .withSetField(Sets.newHashSet("1", "2", "3")));
+
+    private Function<Data, byte[]> encoder;
+    private Function<byte[], Data> decoder;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        configService.registerProperties(getClass());
+        setupCodecs();
+        messageReceivingExecutor = receiveOnIOLoopThread
+                ? MoreExecutors.directExecutor()
+                : Executors.newFixedThreadPool(
+                        totalReceiverThreads,
+                        groupedThreads("onos/net-perf-test", "receiver-%d"));
+        registerMessageHandlers();
+        startTest();
+        reporter.scheduleWithFixedDelay(this::reportPerformance,
+                reportIntervalSeconds,
+                reportIntervalSeconds,
+                TimeUnit.SECONDS);
+        logConfig("Started");
+    }
+
+    @Deactivate
+    public void deactivate(ComponentContext context) {
+        configService.unregisterProperties(getClass(), false);
+        stopTest();
+        reporter.shutdown();
+        unregisterMessageHandlers();
+        log.info("Stopped.");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context == null) {
+            totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
+            totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
+            serializationOn = true;
+            receiveOnIOLoopThread = false;
+            return;
+        }
+
+        Dictionary properties = context.getProperties();
+
+        int newTotalSenderThreads = totalSenderThreads;
+        int newTotalReceiverThreads = totalReceiverThreads;
+        boolean newSerializationOn = serializationOn;
+        boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
+        try {
+            String s = get(properties, "totalSenderThreads");
+            newTotalSenderThreads = isNullOrEmpty(s)
+                    ? totalSenderThreads : Integer.parseInt(s.trim());
+
+            s = get(properties, "totalReceiverThreads");
+            newTotalReceiverThreads = isNullOrEmpty(s)
+                    ? totalReceiverThreads : Integer.parseInt(s.trim());
+
+            s = get(properties, "serializationOn");
+            newSerializationOn = isNullOrEmpty(s)
+                    ? serializationOn : Boolean.parseBoolean(s.trim());
+
+            s = get(properties, "receiveOnIOLoopThread");
+            newReceiveOnIOLoopThread = isNullOrEmpty(s)
+                    ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            return;
+        }
+
+        boolean modified = newTotalSenderThreads != totalSenderThreads ||
+                newTotalReceiverThreads != totalReceiverThreads ||
+                newSerializationOn != serializationOn ||
+                newReceiveOnIOLoopThread != receiveOnIOLoopThread;
+
+        // If nothing has changed, simply return.
+        if (!modified) {
+            return;
+        }
+
+        totalSenderThreads = newTotalSenderThreads;
+        totalReceiverThreads = newTotalReceiverThreads;
+        serializationOn = newSerializationOn;
+        if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
+            ((ExecutorService) messageReceivingExecutor).shutdown();
+        }
+        receiveOnIOLoopThread = newReceiveOnIOLoopThread;
+
+        // restart test.
+
+        stopTest();
+        unregisterMessageHandlers();
+        setupCodecs();
+        messageSendingExecutor =
+                BoundedThreadPool.newFixedThreadPool(
+                        totalSenderThreads,
+                        groupedThreads("onos/net-perf-test", "sender-%d"));
+        messageReceivingExecutor = receiveOnIOLoopThread
+                    ? MoreExecutors.directExecutor()
+                    : Executors.newFixedThreadPool(
+                            totalReceiverThreads,
+                            groupedThreads("onos/net-perf-test", "receiver-%d"));
+
+        registerMessageHandlers();
+        startTest();
+
+        logConfig("Reconfigured");
+    }
+
+
+    private void logConfig(String prefix) {
+        log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
+                + " serializationOn = {}, receiveOnIOLoopThread = {}",
+                 prefix,
+                 totalSenderThreads,
+                 totalReceiverThreads,
+                 serializationOn,
+                 receiveOnIOLoopThread);
+    }
+
+    private void setupCodecs() {
+        encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
+        decoder = serializationOn ? SERIALIZER::decode : b -> data;
+    }
+
+    private void registerMessageHandlers() {
+        communicationService.<Data>addSubscriber(
+                TEST_UNICAST_MESSAGE_TOPIC,
+                decoder,
+                d -> { received.incrementAndGet(); },
+                messageReceivingExecutor);
+
+        communicationService.<Data, Data>addSubscriber(
+                TEST_REQUEST_REPLY_TOPIC,
+                decoder,
+                Function.identity(),
+                encoder,
+                messageReceivingExecutor);
+    }
+
+    private void unregisterMessageHandlers() {
+        communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
+        communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
+    }
+
+    private void startTest() {
+        IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
+    }
+
+    private void stopTest() {
+        messageSendingExecutor.shutdown();
+    }
+
+    private void requestReply() {
+        try {
+            attempted.incrementAndGet();
+            CompletableFuture<Data> response =
+                    communicationService.<Data, Data>sendAndReceive(
+                            data,
+                            TEST_REQUEST_REPLY_TOPIC,
+                            encoder,
+                            decoder,
+                            randomPeer());
+            response.whenComplete((result, error) -> {
+                if (Objects.equals(data, result)) {
+                    completed.incrementAndGet();
+                }
+                messageSendingExecutor.submit(this::requestReply);
+            });
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void unicast() {
+        try {
+            sent.incrementAndGet();
+            communicationService.<Data>unicast(
+                    data,
+                    TEST_UNICAST_MESSAGE_TOPIC,
+                    encoder,
+                    randomPeer());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        messageSendingExecutor.submit(this::unicast);
+    }
+
+    private void broadcast() {
+        try {
+            sent.incrementAndGet();
+            communicationService.<Data>broadcast(
+                    data,
+                    TEST_UNICAST_MESSAGE_TOPIC,
+                    encoder);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        messageSendingExecutor.submit(this::broadcast);
+    }
+
+    private NodeId randomPeer() {
+        return clusterService.getNodes()
+                    .stream()
+                    .filter(node -> clusterService.getLocalNode().equals(node))
+                    .findAny()
+                    .get()
+                    .id();
+    }
+
+    private void reportPerformance() {
+        log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
+    }
+
+    private static class Data {
+        private String stringField;
+        private List<String> listField;
+        private Set<String> setField;
+
+        public Data withStringField(String value) {
+            stringField = value;
+            return this;
+        }
+
+        public Data withListField(List<String> value) {
+            listField = ImmutableList.copyOf(value);
+            return this;
+        }
+
+        public Data withSetField(Set<String> value) {
+            setField = ImmutableSet.copyOf(value);
+            return this;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(stringField, listField, setField);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other instanceof Data) {
+                Data that = (Data) other;
+                return Objects.equals(this.stringField, that.stringField) &&
+                Objects.equals(this.listField, that.listField) &&
+                Objects.equals(this.setField, that.setField);
+            }
+            return false;
+        }
+    }
+}
diff --git a/apps/test/messaging-perf/src/main/java/org/onosproject/messagingperf/package-info.java b/apps/test/messaging-perf/src/main/java/org/onosproject/messagingperf/package-info.java
new file mode 100644
index 0000000..98d682a
--- /dev/null
+++ b/apps/test/messaging-perf/src/main/java/org/onosproject/messagingperf/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Performance test application for the cluster messaging subsystem.
+ */
+package org.onosproject.messagingperf;
diff --git a/apps/test/pom.xml b/apps/test/pom.xml
index daab99b..bda05d0 100644
--- a/apps/test/pom.xml
+++ b/apps/test/pom.xml
@@ -34,6 +34,7 @@
     <modules>
         <module>election</module>
         <module>intent-perf</module>
+        <module>messaging-perf</module>
         <module>demo</module>
     </modules>