ONOS-2456 Added usage metrics to Atomic Counter and Distributed Queue plus refactored the code a bit
Refactored code plus instrumented AtomicValue and DistributedSet
Change-Id: I9c5f7c9f23d530131f15d3c98250ea33238dd2ec
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index cdaf792..7ff6b89 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -15,20 +15,15 @@
*/
package org.onosproject.store.consistent.impl;
+import org.onosproject.store.service.AsyncAtomicCounter;
+import org.slf4j.Logger;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
-import org.onosproject.store.service.AsyncAtomicCounter;
-
-
-
-
-
-import org.slf4j.Logger;
-
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -46,66 +41,90 @@
// TODO: configure delay via builder
private static final int DELAY_BETWEEN_RETRY_SEC = 1;
private final Logger log = getLogger(getClass());
+ private final MeteringAgent monitor;
+
+ private static final String PRIMITIVE_NAME = "atomicCounter";
+ private static final String INCREMENT_AND_GET = "incrementAndGet";
+ private static final String GET_AND_INCREMENT = "getAndIncrement";
+ private static final String GET_AND_ADD = "getAndAdd";
+ private static final String ADD_AND_GET = "addAndGet";
+ private static final String GET = "get";
public DefaultAsyncAtomicCounter(String name,
- Database database,
- boolean retryOnException,
- ScheduledExecutorService retryExecutor) {
+ Database database,
+ boolean retryOnException,
+ boolean meteringEnabled,
+ ScheduledExecutorService retryExecutor) {
this.name = checkNotNull(name);
this.database = checkNotNull(database);
this.retryOnFailure = retryOnException;
this.retryExecutor = retryExecutor;
+ this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
}
@Override
public CompletableFuture<Long> incrementAndGet() {
- return addAndGet(1L);
+ final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
+ return addAndGet(1L)
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Long> get() {
- return database.counterGet(name);
+ final MeteringAgent.Context timer = monitor.startTimer(GET);
+ return database.counterGet(name)
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Long> getAndIncrement() {
- return getAndAdd(1L);
+ final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT);
+ return getAndAdd(1L)
+ .whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Long> getAndAdd(long delta) {
+ final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
CompletableFuture<Long> result = database.counterGetAndAdd(name, delta);
if (!retryOnFailure) {
- return result;
+ return result
+ .whenComplete((r, e) -> timer.stop());
}
CompletableFuture<Long> future = new CompletableFuture<>();
return result.whenComplete((r, e) -> {
+ timer.stop();
+ // TODO : Account for retries
if (e != null) {
log.warn("getAndAdd failed due to {}. Will retry", e.getMessage());
retryExecutor.schedule(new RetryTask(database::counterGetAndAdd, delta, future),
- DELAY_BETWEEN_RETRY_SEC,
- TimeUnit.SECONDS);
+ DELAY_BETWEEN_RETRY_SEC,
+ TimeUnit.SECONDS);
} else {
future.complete(r);
}
- }).thenCompose(v -> future);
+ }).thenCompose(v -> future);
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
+ final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
CompletableFuture<Long> result = database.counterAddAndGet(name, delta);
if (!retryOnFailure) {
- return result;
+ return result
+ .whenComplete((r, e) -> timer.stop());
}
CompletableFuture<Long> future = new CompletableFuture<>();
return result.whenComplete((r, e) -> {
+ timer.stop();
+ // TODO : Account for retries
if (e != null) {
log.warn("addAndGet failed due to {}. Will retry", e.getMessage());
retryExecutor.schedule(new RetryTask(database::counterAddAndGet, delta, future),
- DELAY_BETWEEN_RETRY_SEC,
- TimeUnit.SECONDS);
+ DELAY_BETWEEN_RETRY_SEC,
+ TimeUnit.SECONDS);
} else {
future.complete(r);
}