ONOS-2315 Adding new metrics to ConsistentMaps

Change-Id: Iba9a70f5eb268834564be26e42776b9caa4ea547
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 7bf30cc..3fbdd2f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -25,6 +25,7 @@
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
@@ -33,6 +34,11 @@
 import java.util.stream.Collectors;
 import java.util.Set;
 
+import com.codahale.metrics.Timer;
+import org.onlab.metrics.MetricsComponent;
+import org.onlab.metrics.MetricsFeature;
+import org.onlab.metrics.MetricsService;
+import org.onlab.osgi.DefaultServiceDirectory;
 import org.onlab.util.HexString;
 import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
@@ -70,6 +76,34 @@
     private final boolean purgeOnUninstall;
     private final Consumer<MapEvent<K, V>> eventPublisher;
 
+    private final MetricsService metricsService;
+    private final MetricsComponent metricsComponent;
+    private final MetricsFeature metricsFeature;
+    private final Map<String, Timer> perMapOpTimers = Maps.newConcurrentMap();
+    private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap();
+    private final Timer cMapTimer;
+    private final Timer perMapTimer;
+    private final MetricsFeature wildcard;
+
+    private static final String COMPONENT_NAME = "consistentMap";
+    private static final String SIZE = "size";
+    private static final String IS_EMPTY = "isEmpty";
+    private static final String CONTAINS_KEY = "containsKey";
+    private static final String CONTAINS_VALUE = "containsValue";
+    private static final String GET = "get";
+    private static final String COMPUTE_IF = "computeIf";
+    private static final String PUT = "put";
+    private static final String PUT_AND_GET = "putAndGet";
+    private static final String PUT_IF_ABSENT = "putIfAbsent";
+    private static final String REMOVE = "remove";
+    private static final String CLEAR = "clear";
+    private static final String KEY_SET = "keySet";
+    private static final String VALUES = "values";
+    private static final String ENTRY_SET = "entrySet";
+    private static final String REPLACE = "replace";
+    private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
+
+
     private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
 
     private final Logger log = getLogger(getClass());
@@ -116,6 +150,13 @@
                 }
             });
         });
+        this.metricsService = DefaultServiceDirectory.getService(MetricsService.class);
+        this.metricsComponent = metricsService.registerComponent(COMPONENT_NAME);
+        this.metricsFeature = metricsComponent.registerFeature(name);
+        this.wildcard = metricsComponent.registerFeature("*");
+        this.perMapTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
+        this.cMapTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
+
     }
 
     /**
@@ -153,31 +194,41 @@
 
     @Override
     public CompletableFuture<Integer> size() {
-        return database.mapSize(name);
+        final OperationTimer timer = startTimer(SIZE);
+        return database.mapSize(name)
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return database.mapIsEmpty(name);
+        final OperationTimer timer = startTimer(IS_EMPTY);
+        return database.mapIsEmpty(name)
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return database.mapContainsKey(name, keyCache.getUnchecked(key));
+        final OperationTimer timer = startTimer(CONTAINS_KEY);
+        return database.mapContainsKey(name, keyCache.getUnchecked(key))
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(V value) {
         checkNotNull(value, ERROR_NULL_VALUE);
-        return database.mapContainsValue(name, serializer.encode(value));
+        final OperationTimer timer = startTimer(CONTAINS_VALUE);
+        return database.mapContainsValue(name, serializer.encode(value))
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Versioned<V>> get(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
+        final OperationTimer timer = startTimer(GET);
         return database.mapGet(name, keyCache.getUnchecked(key))
-            .thenApply(v -> v != null ? v.map(serializer::decode) : null);
+                .whenComplete((r, e) -> timer.stop())
+        .thenApply(v -> v != null ? v.map(serializer::decode) : null);
     }
 
     @Override
@@ -185,7 +236,10 @@
             Function<? super K, ? extends V> mappingFunction) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(mappingFunction, "Mapping function cannot be null");
-        return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key)).thenApply(v -> v.newValue());
+        final OperationTimer timer = startTimer(COMPUTE_IF_ABSENT);
+        return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(v -> v.newValue());
     }
 
     @Override
@@ -207,6 +261,7 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(condition, "predicate function cannot be null");
         checkNotNull(remappingFunction, "Remapping function cannot be null");
+        final OperationTimer timer = startTimer(COMPUTE_IF);
         return get(key).thenCompose(r1 -> {
             V existingValue = r1 == null ? null : r1.value();
             // if the condition evaluates to false, return existing value.
@@ -227,6 +282,7 @@
             Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
             Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
             return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
+                    .whenComplete((r, e) -> timer.stop())
                     .thenApply(v -> {
                         if (v.updated()) {
                             return v.newValue();
@@ -241,71 +297,96 @@
     public CompletableFuture<Versioned<V>> put(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue());
+        final OperationTimer timer = startTimer(PUT);
+        return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue());
+        final OperationTimer timer = startTimer(PUT_AND_GET);
+        return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Versioned<V>> remove(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue());
+        final OperationTimer timer = startTimer(REMOVE);
+        return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Void> clear() {
         checkIfUnmodifiable();
-        return database.mapClear(name).thenApply(this::unwrapResult);
+        final OperationTimer timer = startTimer(CLEAR);
+        return database.mapClear(name).thenApply(this::unwrapResult)
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Set<K>> keySet() {
+        final OperationTimer timer = startTimer(KEY_SET);
         return database.mapKeySet(name)
                 .thenApply(s -> s
-                .stream()
-                .map(this::dK)
-                .collect(Collectors.toSet()));
+                        .stream()
+                        .map(this::dK)
+                        .collect(Collectors.toSet()))
+                .whenComplete((r, e) -> timer.stop());
     }
 
     @Override
     public CompletableFuture<Collection<Versioned<V>>> values() {
-        return database.mapValues(name).thenApply(c -> c
-            .stream()
-            .map(v -> v.<V>map(serializer::decode))
-            .collect(Collectors.toList()));
+        final OperationTimer timer = startTimer(VALUES);
+        return database.mapValues(name)
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(c -> c
+                        .stream()
+                        .map(v -> v.<V>map(serializer::decode))
+                        .collect(Collectors.toList()));
     }
 
     @Override
     public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
-        return database.mapEntrySet(name).thenApply(s -> s
-                .stream()
-                .map(this::mapRawEntry)
-                .collect(Collectors.toSet()));
+        final OperationTimer timer = startTimer(ENTRY_SET);
+        return database.mapEntrySet(name)
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(s -> s
+                        .stream()
+                        .map(this::mapRawEntry)
+                        .collect(Collectors.toSet()));
     }
 
     @Override
     public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return updateAndGet(key, Match.ifNull(), Match.any(), value).thenApply(v -> v.oldValue());
+        final OperationTimer timer = startTimer(PUT_IF_ABSENT);
+        return updateAndGet(key, Match.ifNull(), Match.any(), value)
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(v -> v.oldValue());
     }
 
     @Override
     public CompletableFuture<Boolean> remove(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return updateAndGet(key, Match.ifValue(value), Match.any(), null).thenApply(v -> v.updated());
+        final OperationTimer timer = startTimer(REMOVE);
+        return updateAndGet(key, Match.ifValue(value), Match.any(), null)
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(v -> v.updated());
     }
 
     @Override
     public CompletableFuture<Boolean> remove(K key, long version) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return updateAndGet(key, Match.any(), Match.ifValue(version), null).thenApply(v -> v.updated());
+        final OperationTimer timer = startTimer(REMOVE);
+        return updateAndGet(key, Match.any(), Match.ifValue(version), null)
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(v -> v.updated());
     }
 
     @Override
@@ -313,12 +394,18 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(oldValue, ERROR_NULL_VALUE);
         checkNotNull(newValue, ERROR_NULL_VALUE);
-        return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue).thenApply(v -> v.updated());
+        final OperationTimer timer = startTimer(REPLACE);
+        return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(v -> v.updated());
     }
 
     @Override
     public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
-        return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue).thenApply(v -> v.updated());
+        final OperationTimer timer = startTimer(REPLACE);
+        return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
+                .whenComplete((r, e) -> timer.stop())
+                .thenApply(v -> v.updated());
     }
 
     private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
@@ -331,10 +418,10 @@
             V value) {
         checkIfUnmodifiable();
         return database.mapUpdate(name,
-                    keyCache.getUnchecked(key),
-                    oldValueMatch.map(serializer::encode),
-                    oldVersionMatch,
-                    value == null ? null : serializer.encode(value))
+                keyCache.getUnchecked(key),
+                oldValueMatch.map(serializer::encode),
+                oldVersionMatch,
+                value == null ? null : serializer.encode(value))
                 .thenApply(this::unwrapResult)
                 .thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
                 .whenComplete((r, e) -> {
@@ -392,4 +479,34 @@
             eventPublisher.accept(event);
         }
     }
+
+    private OperationTimer startTimer(String op) {
+        //check if timer exist, if it doesn't creates it
+        final Timer currTimer = perMapOpTimers.computeIfAbsent(op, timer ->
+                metricsService.createTimer(metricsComponent, metricsFeature, op));
+        perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op));
+        //starts timer
+        return new OperationTimer(currTimer.time(), op);
+    }
+
+    private class OperationTimer {
+        private final Timer.Context context;
+        private final String operation;
+
+        public OperationTimer(Timer.Context context, String operation) {
+            this.context = context;
+            this.operation = operation;
+        }
+
+        public void stop() {
+            //Stop and updates timer with specific measurements per map, per operation
+            final long time = context.stop();
+            //updates timer with aggregated measurements per map
+            perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
+            //updates timer with aggregated measurements per map
+            perMapTimer.update(time, TimeUnit.NANOSECONDS);
+            //updates timer with aggregated measurements per all Consistent Maps
+            cMapTimer.update(time, TimeUnit.NANOSECONDS);
+        }
+    }
 }
\ No newline at end of file