Work queue improvements
- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive

Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
index 879cbb3..0085932 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -18,6 +18,9 @@
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
 
 import java.util.Collection;
 import java.util.List;
@@ -34,22 +37,19 @@
 import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.Accumulator;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
-import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
 import com.google.common.collect.ImmutableList;
 
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
 /**
  * Distributed resource providing the {@link WorkQueue} primitive.
  */
@@ -69,6 +69,18 @@
     }
 
     @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        executor.shutdown();
+        timer.cancel();
+        return client.submit(new Clear());
+    }
+
+    @Override
     public CompletableFuture<AtomixWorkQueue> open() {
         return super.open().thenApply(result -> {
             client.onStateChange(state -> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
index 3724529..2e77da9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
@@ -15,6 +15,14 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.stream.Collectors;
@@ -25,14 +33,6 @@
 
 import com.google.common.base.MoreObjects;
 
-import io.atomix.catalyst.buffer.BufferInput;
-import io.atomix.catalyst.buffer.BufferOutput;
-import io.atomix.catalyst.serializer.CatalystSerializable;
-import io.atomix.catalyst.serializer.SerializableTypeResolver;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.serializer.SerializerRegistry;
-import io.atomix.copycat.Command;
-
 /**
  * {@link AtomixWorkQueue} resource state machine operations.
  */
@@ -207,6 +207,24 @@
         }
     }
 
+    @SuppressWarnings("serial")
+    public static class Clear implements Command<Void>, CatalystSerializable {
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+    }
+
     /**
      * Work queue command type resolver.
      */
@@ -219,6 +237,7 @@
             registry.register(Add.class, -963);
             registry.register(Complete.class, -964);
             registry.register(Stats.class, -965);
+            registry.register(Clear.class, -966);
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
index d287e19..b226860 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -16,6 +16,14 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,6 +39,7 @@
 
 import org.onlab.util.CountDownCompleter;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
@@ -47,15 +56,6 @@
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.AtomicLongMap;
 
-import io.atomix.copycat.server.Commit;
-import io.atomix.copycat.server.Snapshottable;
-import io.atomix.copycat.server.StateMachineExecutor;
-import io.atomix.copycat.server.session.ServerSession;
-import io.atomix.copycat.server.session.SessionListener;
-import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
-import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
-import io.atomix.resource.ResourceStateMachine;
-
 /**
  * State machine for {@link AtomixWorkQueue} resource.
  */
@@ -82,6 +82,7 @@
         executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
         executor.register(Take.class, this::take);
         executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
+        executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
     }
 
     protected WorkQueueStats stats(Commit<? extends Stats> commit) {
@@ -96,6 +97,17 @@
         }
     }
 
+    protected void clear(Commit<? extends Clear> commit) {
+        unassignedTasks.forEach(TaskHolder::complete);
+        unassignedTasks.clear();
+        assignments.values().forEach(TaskAssignment::markComplete);
+        assignments.clear();
+        registeredWorkers.values().forEach(Commit::close);
+        registeredWorkers.clear();
+        activeTasksPerSession.clear();
+        totalCompleted.set(0);
+    }
+
     protected void register(Commit<? extends Register> commit) {
         long sessionId = commit.session().id();
         if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
@@ -172,7 +184,7 @@
         try {
             commit.operation().taskIds().forEach(taskId -> {
                 TaskAssignment assignment = assignments.get(taskId);
-                if (assignment != null) {
+                if (assignment != null && assignment.sessionId() == sessionId) {
                     assignments.remove(taskId).markComplete();
                     // bookkeeping
                     totalCompleted.incrementAndGet();