stc work queue test improvements
Change-Id: I8b9335b0bbfdc8a447c5955bf3621962ff112cb2
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
index 73380de..cd27a5b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
@@ -29,10 +29,10 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
+import org.onosproject.utils.MeteringAgent;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
-import org.onosproject.utils.MeteringAgent;
/**
* Default implementation of a {@code AsyncAtomicValue}.
@@ -56,6 +56,7 @@
private static final String ADD_LISTENER = "addListener";
private static final String REMOVE_LISTENER = "removeListener";
private static final String NOTIFY_LISTENER = "notifyListener";
+ private static final String DESTROY = "destroy";
public DefaultAsyncAtomicValue(String name, Serializer serializer, AsyncConsistentMap<String, byte[]> backingMap) {
this.name = checkNotNull(name, "name must not be null");
@@ -70,6 +71,14 @@
}
@Override
+ public CompletableFuture<Void> destroy() {
+ final MeteringAgent.Context newTimer = monitor.startTimer(DESTROY);
+ return backingMap.remove(name)
+ .whenComplete((r, e) -> newTimer.stop(e))
+ .thenApply(v -> null);
+ }
+
+ @Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
return backingMap.replace(name, serializer.encode(expect), serializer.encode(update))
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
index bdc9d31..d16c1b8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -7,9 +7,9 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import com.google.common.collect.Collections2;
@@ -73,4 +73,9 @@
public CompletableFuture<Void> stopProcessing() {
return backingQueue.stopProcessing();
}
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ return backingQueue.destroy();
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
index b226860..82f28e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -98,14 +98,18 @@
}
protected void clear(Commit<? extends Clear> commit) {
- unassignedTasks.forEach(TaskHolder::complete);
- unassignedTasks.clear();
- assignments.values().forEach(TaskAssignment::markComplete);
- assignments.clear();
- registeredWorkers.values().forEach(Commit::close);
- registeredWorkers.clear();
- activeTasksPerSession.clear();
- totalCompleted.set(0);
+ try {
+ unassignedTasks.forEach(TaskHolder::complete);
+ unassignedTasks.clear();
+ assignments.values().forEach(TaskAssignment::markComplete);
+ assignments.clear();
+ registeredWorkers.values().forEach(Commit::close);
+ registeredWorkers.clear();
+ activeTasksPerSession.clear();
+ totalCompleted.set(0);
+ } finally {
+ commit.close();
+ }
}
protected void register(Commit<? extends Register> commit) {