ONOS-2315 Adding new metrics to ConsistentMaps
Change-Id: Iba9a70f5eb268834564be26e42776b9caa4ea547
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 7bf30cc..3fbdd2f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -33,6 +34,11 @@
import java.util.stream.Collectors;
import java.util.Set;
+import com.codahale.metrics.Timer;
+import org.onlab.metrics.MetricsComponent;
+import org.onlab.metrics.MetricsFeature;
+import org.onlab.metrics.MetricsService;
+import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.util.HexString;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
@@ -70,6 +76,34 @@
private final boolean purgeOnUninstall;
private final Consumer<MapEvent<K, V>> eventPublisher;
+ private final MetricsService metricsService;
+ private final MetricsComponent metricsComponent;
+ private final MetricsFeature metricsFeature;
+ private final Map<String, Timer> perMapOpTimers = Maps.newConcurrentMap();
+ private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap();
+ private final Timer cMapTimer;
+ private final Timer perMapTimer;
+ private final MetricsFeature wildcard;
+
+ private static final String COMPONENT_NAME = "consistentMap";
+ private static final String SIZE = "size";
+ private static final String IS_EMPTY = "isEmpty";
+ private static final String CONTAINS_KEY = "containsKey";
+ private static final String CONTAINS_VALUE = "containsValue";
+ private static final String GET = "get";
+ private static final String COMPUTE_IF = "computeIf";
+ private static final String PUT = "put";
+ private static final String PUT_AND_GET = "putAndGet";
+ private static final String PUT_IF_ABSENT = "putIfAbsent";
+ private static final String REMOVE = "remove";
+ private static final String CLEAR = "clear";
+ private static final String KEY_SET = "keySet";
+ private static final String VALUES = "values";
+ private static final String ENTRY_SET = "entrySet";
+ private static final String REPLACE = "replace";
+ private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
+
+
private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
private final Logger log = getLogger(getClass());
@@ -116,6 +150,13 @@
}
});
});
+ this.metricsService = DefaultServiceDirectory.getService(MetricsService.class);
+ this.metricsComponent = metricsService.registerComponent(COMPONENT_NAME);
+ this.metricsFeature = metricsComponent.registerFeature(name);
+ this.wildcard = metricsComponent.registerFeature("*");
+ this.perMapTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
+ this.cMapTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
+
}
/**
@@ -153,31 +194,41 @@
@Override
public CompletableFuture<Integer> size() {
- return database.mapSize(name);
+ final OperationTimer timer = startTimer(SIZE);
+ return database.mapSize(name)
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Boolean> isEmpty() {
- return database.mapIsEmpty(name);
+ final OperationTimer timer = startTimer(IS_EMPTY);
+ return database.mapIsEmpty(name)
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Boolean> containsKey(K key) {
checkNotNull(key, ERROR_NULL_KEY);
- return database.mapContainsKey(name, keyCache.getUnchecked(key));
+ final OperationTimer timer = startTimer(CONTAINS_KEY);
+ return database.mapContainsKey(name, keyCache.getUnchecked(key))
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Boolean> containsValue(V value) {
checkNotNull(value, ERROR_NULL_VALUE);
- return database.mapContainsValue(name, serializer.encode(value));
+ final OperationTimer timer = startTimer(CONTAINS_VALUE);
+ return database.mapContainsValue(name, serializer.encode(value))
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Versioned<V>> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
+ final OperationTimer timer = startTimer(GET);
return database.mapGet(name, keyCache.getUnchecked(key))
- .thenApply(v -> v != null ? v.map(serializer::decode) : null);
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(v -> v != null ? v.map(serializer::decode) : null);
}
@Override
@@ -185,7 +236,10 @@
Function<? super K, ? extends V> mappingFunction) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(mappingFunction, "Mapping function cannot be null");
- return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key)).thenApply(v -> v.newValue());
+ final OperationTimer timer = startTimer(COMPUTE_IF_ABSENT);
+ return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(v -> v.newValue());
}
@Override
@@ -207,6 +261,7 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(condition, "predicate function cannot be null");
checkNotNull(remappingFunction, "Remapping function cannot be null");
+ final OperationTimer timer = startTimer(COMPUTE_IF);
return get(key).thenCompose(r1 -> {
V existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
@@ -227,6 +282,7 @@
Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
+ .whenComplete((r, e) -> timer.stop())
.thenApply(v -> {
if (v.updated()) {
return v.newValue();
@@ -241,71 +297,96 @@
public CompletableFuture<Versioned<V>> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue());
+ final OperationTimer timer = startTimer(PUT);
+ return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue());
+ final OperationTimer timer = startTimer(PUT_AND_GET);
+ return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Versioned<V>> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
- return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue());
+ final OperationTimer timer = startTimer(REMOVE);
+ return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Void> clear() {
checkIfUnmodifiable();
- return database.mapClear(name).thenApply(this::unwrapResult);
+ final OperationTimer timer = startTimer(CLEAR);
+ return database.mapClear(name).thenApply(this::unwrapResult)
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Set<K>> keySet() {
+ final OperationTimer timer = startTimer(KEY_SET);
return database.mapKeySet(name)
.thenApply(s -> s
- .stream()
- .map(this::dK)
- .collect(Collectors.toSet()));
+ .stream()
+ .map(this::dK)
+ .collect(Collectors.toSet()))
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Collection<Versioned<V>>> values() {
- return database.mapValues(name).thenApply(c -> c
- .stream()
- .map(v -> v.<V>map(serializer::decode))
- .collect(Collectors.toList()));
+ final OperationTimer timer = startTimer(VALUES);
+ return database.mapValues(name)
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(c -> c
+ .stream()
+ .map(v -> v.<V>map(serializer::decode))
+ .collect(Collectors.toList()));
}
@Override
public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
- return database.mapEntrySet(name).thenApply(s -> s
- .stream()
- .map(this::mapRawEntry)
- .collect(Collectors.toSet()));
+ final OperationTimer timer = startTimer(ENTRY_SET);
+ return database.mapEntrySet(name)
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(s -> s
+ .stream()
+ .map(this::mapRawEntry)
+ .collect(Collectors.toSet()));
}
@Override
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return updateAndGet(key, Match.ifNull(), Match.any(), value).thenApply(v -> v.oldValue());
+ final OperationTimer timer = startTimer(PUT_IF_ABSENT);
+ return updateAndGet(key, Match.ifNull(), Match.any(), value)
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(v -> v.oldValue());
}
@Override
public CompletableFuture<Boolean> remove(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return updateAndGet(key, Match.ifValue(value), Match.any(), null).thenApply(v -> v.updated());
+ final OperationTimer timer = startTimer(REMOVE);
+ return updateAndGet(key, Match.ifValue(value), Match.any(), null)
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Boolean> remove(K key, long version) {
checkNotNull(key, ERROR_NULL_KEY);
- return updateAndGet(key, Match.any(), Match.ifValue(version), null).thenApply(v -> v.updated());
+ final OperationTimer timer = startTimer(REMOVE);
+ return updateAndGet(key, Match.any(), Match.ifValue(version), null)
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(v -> v.updated());
}
@Override
@@ -313,12 +394,18 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(oldValue, ERROR_NULL_VALUE);
checkNotNull(newValue, ERROR_NULL_VALUE);
- return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue).thenApply(v -> v.updated());
+ final OperationTimer timer = startTimer(REPLACE);
+ return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
- return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue).thenApply(v -> v.updated());
+ final OperationTimer timer = startTimer(REPLACE);
+ return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
+ .whenComplete((r, e) -> timer.stop())
+ .thenApply(v -> v.updated());
}
private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
@@ -331,10 +418,10 @@
V value) {
checkIfUnmodifiable();
return database.mapUpdate(name,
- keyCache.getUnchecked(key),
- oldValueMatch.map(serializer::encode),
- oldVersionMatch,
- value == null ? null : serializer.encode(value))
+ keyCache.getUnchecked(key),
+ oldValueMatch.map(serializer::encode),
+ oldVersionMatch,
+ value == null ? null : serializer.encode(value))
.thenApply(this::unwrapResult)
.thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
.whenComplete((r, e) -> {
@@ -392,4 +479,34 @@
eventPublisher.accept(event);
}
}
+
+ private OperationTimer startTimer(String op) {
+ //check if timer exist, if it doesn't creates it
+ final Timer currTimer = perMapOpTimers.computeIfAbsent(op, timer ->
+ metricsService.createTimer(metricsComponent, metricsFeature, op));
+ perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op));
+ //starts timer
+ return new OperationTimer(currTimer.time(), op);
+ }
+
+ private class OperationTimer {
+ private final Timer.Context context;
+ private final String operation;
+
+ public OperationTimer(Timer.Context context, String operation) {
+ this.context = context;
+ this.operation = operation;
+ }
+
+ public void stop() {
+ //Stop and updates timer with specific measurements per map, per operation
+ final long time = context.stop();
+ //updates timer with aggregated measurements per map
+ perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
+ //updates timer with aggregated measurements per map
+ perMapTimer.update(time, TimeUnit.NANOSECONDS);
+ //updates timer with aggregated measurements per all Consistent Maps
+ cMapTimer.update(time, TimeUnit.NANOSECONDS);
+ }
+ }
}
\ No newline at end of file