Setting up monitoring of cluster communcation service

Change-Id: I771b23db6920b26b592abc5d5156e9d77cde4f00
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteringAgent.java b/core/common/src/main/java/org/onosproject/utils/MeteringAgent.java
similarity index 97%
rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteringAgent.java
rename to core/common/src/main/java/org/onosproject/utils/MeteringAgent.java
index 27a422e..e3877e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteringAgent.java
+++ b/core/common/src/main/java/org/onosproject/utils/MeteringAgent.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Open Networking Laboratory
+ * Copyright 2016 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.store.primitives.impl;
+package org.onosproject.utils;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
diff --git a/core/common/src/main/java/org/onosproject/utils/package-info.java b/core/common/src/main/java/org/onosproject/utils/package-info.java
new file mode 100644
index 0000000..570b384
--- /dev/null
+++ b/core/common/src/main/java/org/onosproject/utils/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Miscellaneous common facilities used for construction of various core and
+ * app subsystems.
+ */
+package org.onosproject.utils;
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index df4ac5c..ffde400 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -31,6 +31,7 @@
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.utils.MeteringAgent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 @Component(immediate = true)
 @Service
@@ -55,6 +57,18 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
+    private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
+
+    private static final String PRIMITIVE_NAME = "clusterCommunication";
+    private static final String SUBJECT_PREFIX = "subject";
+    private static final String ENDPOINT_PREFIX = "endpoint";
+
+    private static final String SERIALIZING = "serialization";
+    private static final String DESERIALIZING = "deserialization";
+    private static final String NODE_PREFIX = "node:";
+    private static final String ROUND_TRIP_SUFFIX = ".rtt";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
@@ -110,7 +124,8 @@
             byte[] payload = new ClusterMessage(
                     localNodeId,
                     subject,
-                    encoder.apply(message)).getBytes();
+                    timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
+                    ).getBytes();
             return doUnicast(subject, payload, toNodeId);
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
@@ -125,7 +140,8 @@
         byte[] payload = new ClusterMessage(
                 localNodeId,
                 subject,
-                encoder.apply(message)).getBytes();
+                timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
+                .getBytes();
         nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
     }
 
@@ -139,8 +155,10 @@
             ClusterMessage envelope = new ClusterMessage(
                     clusterService.getLocalNode().id(),
                     subject,
-                    encoder.apply(message));
-            return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+                    timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
+                            apply(message));
+            return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+                    thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
         } catch (Exception e) {
             return Tools.exceptionalFuture(e);
         }
@@ -157,7 +175,15 @@
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
+        final MeteringAgent.Context epContext = endpointMeteringAgent.
+                startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
+        final MeteringAgent.Context subjectContext = subjectMeteringAgent.
+                startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
+        return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
+                whenComplete((bytes, throwable) -> {
+                    subjectContext.stop(throwable);
+                    epContext.stop(throwable);
+                });
     }
 
     @Override
@@ -213,6 +239,40 @@
                 executor);
     }
 
+    /**
+     * Performs the timed function, returning the value it would while timing the operation.
+     *
+     * @param timedFunction the function to be timed
+     * @param meter the metering agent to be used to time the function
+     * @param opName the opname to be used when starting the meter
+     * @param <A> The param type of the function
+     * @param <B> The return type of the function
+     * @return the value returned by the timed function
+     */
+    private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
+                                               MeteringAgent meter, String opName) {
+        checkNotNull(timedFunction);
+        checkNotNull(meter);
+        checkNotNull(opName);
+        return new Function<A, B>() {
+            @Override
+            public B apply(A a) {
+                final MeteringAgent.Context context = meter.startTimer(opName);
+                B result = null;
+                try {
+                    result = timedFunction.apply(a);
+                } catch (Exception e) {
+                    context.stop(e);
+                    throw new RuntimeException(e);
+                } finally {
+                    context.stop(null);
+                    return result;
+                }
+            }
+        };
+    }
+
+
     private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
         private ClusterMessageHandler handler;
 
@@ -243,7 +303,9 @@
 
         @Override
         public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
-            return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
+            return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+                    apply(ClusterMessage.fromBytes(bytes).payload())).
+                    thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
         }
     }
 
@@ -258,7 +320,8 @@
 
         @Override
         public void accept(Endpoint sender, byte[] bytes) {
-            consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
+            consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+                    apply(ClusterMessage.fromBytes(bytes).payload()));
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java
index 1a38727..6a79292 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.primitives.impl;
 
 import org.onosproject.store.service.AsyncAtomicCounter;
+import org.onosproject.utils.MeteringAgent;
 
 import java.util.concurrent.CompletableFuture;
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
index 7bbd9d3..1d42a97 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
@@ -32,6 +32,7 @@
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
+import org.onosproject.utils.MeteringAgent;
 
 
 public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
index 281fd16..958734c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java
@@ -47,6 +47,7 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.utils.MeteringAgent;
 import org.slf4j.Logger;
 
 import com.google.common.cache.CacheBuilder;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncDistributedSet.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncDistributedSet.java
index aaa44aa..fe680aa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncDistributedSet.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncDistributedSet.java
@@ -33,6 +33,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.onosproject.utils.MeteringAgent;
 
 /**
  * Implementation of {@link AsyncDistributedSet}.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java
index 8ea1d72..e4cf65f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java
@@ -22,6 +22,7 @@
 import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.utils.MeteringAgent;
 
 import java.util.List;
 import java.util.Set;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
index 540d0ad..c397f30 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
@@ -32,6 +32,7 @@
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
+import org.onosproject.utils.MeteringAgent;
 
 /**
  * {@link AsyncConsistentMap} that meters all its operations.
diff --git a/tools/test/cells/virtual b/tools/test/cells/virtual
new file mode 100644
index 0000000..0cac447
--- /dev/null
+++ b/tools/test/cells/virtual
@@ -0,0 +1,14 @@
+# Local VirtualBox-based ONOS instances 1,2 & ONOS mininet box
+
+export ONOS_NIC=192.168.56.*
+export OC1="192.168.56.101"
+export OC2="192.168.56.102"
+export OC3="192.168.56.103"
+export OCN="192.168.56.100"
+
+export ONOS_USE_SSH=true
+export ONOS_APPS="drivers,openflow,fwd,proxyarp,mobility"
+
+export ONOS_USER=sdn
+export ONOS_WEB_PASS=sdnrocks
+export ONOS_WEB_USER=onos
\ No newline at end of file