Work queue improvements
- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive
Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
index 879cbb3..0085932 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -18,6 +18,9 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.List;
@@ -34,22 +37,19 @@
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
/**
* Distributed resource providing the {@link WorkQueue} primitive.
*/
@@ -69,6 +69,18 @@
}
@Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ executor.shutdown();
+ timer.cancel();
+ return client.submit(new Clear());
+ }
+
+ @Override
public CompletableFuture<AtomixWorkQueue> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
index 3724529..2e77da9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
@@ -15,6 +15,14 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
@@ -25,14 +33,6 @@
import com.google.common.base.MoreObjects;
-import io.atomix.catalyst.buffer.BufferInput;
-import io.atomix.catalyst.buffer.BufferOutput;
-import io.atomix.catalyst.serializer.CatalystSerializable;
-import io.atomix.catalyst.serializer.SerializableTypeResolver;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.serializer.SerializerRegistry;
-import io.atomix.copycat.Command;
-
/**
* {@link AtomixWorkQueue} resource state machine operations.
*/
@@ -207,6 +207,24 @@
}
}
+ @SuppressWarnings("serial")
+ public static class Clear implements Command<Void>, CatalystSerializable {
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+
/**
* Work queue command type resolver.
*/
@@ -219,6 +237,7 @@
registry.register(Add.class, -963);
registry.register(Complete.class, -964);
registry.register(Stats.class, -965);
+ registry.register(Clear.class, -966);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
index d287e19..b226860 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -16,6 +16,14 @@
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,6 +39,7 @@
import org.onlab.util.CountDownCompleter;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
@@ -47,15 +56,6 @@
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AtomicLongMap;
-import io.atomix.copycat.server.Commit;
-import io.atomix.copycat.server.Snapshottable;
-import io.atomix.copycat.server.StateMachineExecutor;
-import io.atomix.copycat.server.session.ServerSession;
-import io.atomix.copycat.server.session.SessionListener;
-import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
-import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
-import io.atomix.resource.ResourceStateMachine;
-
/**
* State machine for {@link AtomixWorkQueue} resource.
*/
@@ -82,6 +82,7 @@
executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
executor.register(Take.class, this::take);
executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
+ executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
}
protected WorkQueueStats stats(Commit<? extends Stats> commit) {
@@ -96,6 +97,17 @@
}
}
+ protected void clear(Commit<? extends Clear> commit) {
+ unassignedTasks.forEach(TaskHolder::complete);
+ unassignedTasks.clear();
+ assignments.values().forEach(TaskAssignment::markComplete);
+ assignments.clear();
+ registeredWorkers.values().forEach(Commit::close);
+ registeredWorkers.clear();
+ activeTasksPerSession.clear();
+ totalCompleted.set(0);
+ }
+
protected void register(Commit<? extends Register> commit) {
long sessionId = commit.session().id();
if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
@@ -172,7 +184,7 @@
try {
commit.operation().taskIds().forEach(taskId -> {
TaskAssignment assignment = assignments.get(taskId);
- if (assignment != null) {
+ if (assignment != null && assignment.sessionId() == sessionId) {
assignments.remove(taskId).markComplete();
// bookkeeping
totalCompleted.incrementAndGet();