Work queue improvements
- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive

Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
diff --git a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
index b8ad5d0..fa64b5d 100644
--- a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
+++ b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
@@ -19,7 +19,6 @@
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedSetBuilder;
-import org.onosproject.store.service.DistributedQueueBuilder;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.LeaderElectorBuilder;
@@ -47,11 +46,6 @@
     }
 
     @Override
-    public <E> DistributedQueueBuilder<E> queueBuilder() {
-        return null;
-    }
-
-    @Override
     public AtomicCounterBuilder atomicCounterBuilder() {
         return null;
     }
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index 2dfcd2c..768aa0b 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -22,7 +22,6 @@
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Serializer;
 
@@ -61,16 +60,6 @@
     <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer);
 
     /**
-     * Creates a new {@code DistributedQueue}.
-     *
-     * @param name queue name
-     * @param serializer serializer to use for serializing/deserializing queue entries
-     * @param <E> queue entry type
-     * @return queue
-     */
-    <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer);
-
-    /**
      * Creates a new {@code AsyncDistributedSet}.
      *
      * @param name set name
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
index b3328e5..c8949c8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -62,9 +62,9 @@
         VALUE,
 
         /**
-         * Distributed queue.
+         * Distributed work queue.
          */
-        QUEUE,
+        WORK_QUEUE,
 
         /**
          * Leader elector.
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java b/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
deleted file mode 100644
index 7266f29..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A distributed collection designed for holding elements prior to processing.
- * A queue provides insertion, extraction and inspection operations. The extraction operation
- * is designed to be non-blocking.
- *
- * @param <E> queue entry type
- */
-public interface DistributedQueue<E> extends DistributedPrimitive {
-
-    /**
-     * Returns total number of entries in the queue.
-     * @return queue size
-     */
-    long size();
-
-    /**
-     * Returns true if queue has elements in it.
-     * @return true is queue has elements, false otherwise
-     */
-    default boolean isEmpty() {
-        return size() == 0;
-    }
-
-    /**
-     * Inserts an entry into the queue.
-     * @param entry entry to insert
-     */
-    void push(E entry);
-
-    /**
-     * If the queue is non-empty, an entry will be removed from the queue and the returned future
-     * will be immediately completed with it. If queue is empty when this call is made, the returned
-     * future will be eventually completed when an entry is added to the queue.
-     * @return queue entry
-     */
-    CompletableFuture<E> pop();
-
-    /**
-     * Returns an entry from the queue without removing it. If the queue is empty returns null.
-     * @return queue entry or null if queue is empty
-     */
-    E peek();
-}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java
deleted file mode 100644
index 1f32f5e..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service;
-
-/**
- * Builder for distributed queue.
- *
- * @param <E> type queue elements.
- */
-public interface DistributedQueueBuilder<E> {
-
-    /**
-     * Sets the name of the queue.
-     * <p>
-     * Each queue is identified by a unique name.
-     * </p>
-     * <p>
-     * Note: This is a mandatory parameter.
-     * </p>
-     *
-     * @param name name of the queue
-     * @return this DistributedQueueBuilder for method chaining
-     */
-    DistributedQueueBuilder<E> withName(String name);
-
-    /**
-     * Sets a serializer that can be used to serialize
-     * the elements pushed into the queue. The serializer
-     * builder should be pre-populated with any classes that will be
-     * put into the queue.
-     * <p>
-     * Note: This is a mandatory parameter.
-     * </p>
-     *
-     * @param serializer serializer
-     * @return this DistributedQueueBuilder for method chaining
-     */
-    DistributedQueueBuilder<E> withSerializer(Serializer serializer);
-
-    /**
-     * Builds a queue based on the configuration options
-     * supplied to this builder.
-     *
-     * @return new distributed queue
-     * @throws java.lang.RuntimeException if a mandatory parameter is missing
-     */
-    DistributedQueue<E> build();
-}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index f342bdd..53830c2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -52,14 +52,6 @@
     <E> DistributedSetBuilder<E> setBuilder();
 
     /**
-     * Creates a new DistributedQueueBuilder.
-     *
-     * @param <E> queue entry type
-     * @return builder for an distributed queue
-     */
-    <E> DistributedQueueBuilder<E> queueBuilder();
-
-    /**
      * Creates a new AtomicCounterBuilder.
      *
      * @return atomic counter builder
diff --git a/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
index f9b0173..99eaa03 100644
--- a/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
@@ -39,7 +39,12 @@
  *
  * @param <E> task payload type.
  */
-public interface WorkQueue<E> {
+public interface WorkQueue<E> extends DistributedPrimitive {
+
+    @Override
+    default DistributedPrimitive.Type primitiveType() {
+        return DistributedPrimitive.Type.WORK_QUEUE;
+    }
 
     /**
      * Adds a collection of tasks to the work queue.
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index 18ea539..3978614 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -35,11 +35,6 @@
     }
 
     @Override
-    public <E> DistributedQueueBuilder<E> queueBuilder() {
-        return null;
-    }
-
-    @Override
     public AtomicCounterBuilder atomicCounterBuilder() {
         return null;
     }
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java b/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
index 95e775b..d8dff29 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
@@ -34,11 +34,6 @@
     }
 
     @Override
-    public <E> DistributedQueueBuilder<E> queueBuilder() {
-        throw new UnsupportedOperationException("queueBuilder");
-    }
-
-    @Override
     public AtomicCounterBuilder atomicCounterBuilder() {
         return TestAtomicCounter.builder();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java
deleted file mode 100644
index 7e05367..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.DistributedQueueBuilder;
-import org.onosproject.store.service.Serializer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Default implementation of a {@code DistributedQueueBuilder}.
- *
- * @param <E> queue entry type
- */
-public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
-
-    private final DistributedPrimitiveCreator primitiveCreator;
-    private String name;
-    private Serializer serializer;
-
-    public DefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) {
-        this.primitiveCreator = primitiveCreator;
-    }
-
-    @Override
-    public DistributedQueueBuilder<E> withName(String name) {
-        checkArgument(name != null && !name.isEmpty());
-        this.name = name;
-        return this;
-    }
-
-    @Override
-    public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
-        checkArgument(serializer != null);
-        this.serializer = serializer;
-        return this;
-    }
-
-    private boolean validInputs() {
-        return name != null && serializer != null;
-    }
-
-    @Override
-    public DistributedQueue<E> build() {
-        checkState(validInputs());
-        return primitiveCreator.newDistributedQueue(name, serializer);
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
index 2f604d3..9ce11bf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -30,6 +30,11 @@
     }
 
     @Override
+    public String name() {
+        return backingQueue.name();
+    }
+
+    @Override
     public CompletableFuture<Void> addMultiple(Collection<E> items) {
         return backingQueue.addMultiple(items.stream()
                                              .map(serializer::encode)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index a095de1..4dc2bc4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -29,9 +29,8 @@
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
@@ -84,11 +83,6 @@
     }
 
     @Override
-    public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
-        return getCreator(name).newDistributedQueue(name, serializer);
-    }
-
-    @Override
     public AsyncLeaderElector newAsyncLeaderElector(String name) {
         checkNotNull(name);
         Map<PartitionId, AsyncLeaderElector> leaderElectors =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 8eb138a..f56b6e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Collection;
@@ -44,9 +46,7 @@
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DistributedQueueBuilder;
 import org.onosproject.store.service.DistributedSetBuilder;
-import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.LeaderElectorBuilder;
 import org.onosproject.store.service.MapInfo;
@@ -55,15 +55,13 @@
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
 
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.*;
-
 /**
  * Implementation for {@code StorageService} and {@code StorageAdminService}.
  */
@@ -137,12 +135,6 @@
     }
 
     @Override
-    public <E> DistributedQueueBuilder<E> queueBuilder() {
-        checkPermission(STORAGE_WRITE);
-        return new DefaultDistributedQueueBuilder<>(federatedPrimitiveCreator);
-    }
-
-    @Override
     public AtomicCounterBuilder atomicCounterBuilder() {
         checkPermission(STORAGE_WRITE);
         return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index d635a79..205656e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -49,10 +49,9 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedPrimitive.Status;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.PartitionClientInfo;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
 import org.slf4j.Logger;
 
 import com.google.common.base.Supplier;
@@ -160,11 +159,6 @@
     }
 
     @Override
-    public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
         AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
         return new DefaultDistributedWorkQueue<>(workQueue, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
index 879cbb3..0085932 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -18,6 +18,9 @@
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
 
 import java.util.Collection;
 import java.util.List;
@@ -34,22 +37,19 @@
 import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.Accumulator;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
-import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
 import com.google.common.collect.ImmutableList;
 
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
 /**
  * Distributed resource providing the {@link WorkQueue} primitive.
  */
@@ -69,6 +69,18 @@
     }
 
     @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        executor.shutdown();
+        timer.cancel();
+        return client.submit(new Clear());
+    }
+
+    @Override
     public CompletableFuture<AtomixWorkQueue> open() {
         return super.open().thenApply(result -> {
             client.onStateChange(state -> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
index 3724529..2e77da9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
@@ -15,6 +15,14 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.stream.Collectors;
@@ -25,14 +33,6 @@
 
 import com.google.common.base.MoreObjects;
 
-import io.atomix.catalyst.buffer.BufferInput;
-import io.atomix.catalyst.buffer.BufferOutput;
-import io.atomix.catalyst.serializer.CatalystSerializable;
-import io.atomix.catalyst.serializer.SerializableTypeResolver;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.serializer.SerializerRegistry;
-import io.atomix.copycat.Command;
-
 /**
  * {@link AtomixWorkQueue} resource state machine operations.
  */
@@ -207,6 +207,24 @@
         }
     }
 
+    @SuppressWarnings("serial")
+    public static class Clear implements Command<Void>, CatalystSerializable {
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+    }
+
     /**
      * Work queue command type resolver.
      */
@@ -219,6 +237,7 @@
             registry.register(Add.class, -963);
             registry.register(Complete.class, -964);
             registry.register(Stats.class, -965);
+            registry.register(Clear.class, -966);
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
index d287e19..b226860 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -16,6 +16,14 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,6 +39,7 @@
 
 import org.onlab.util.CountDownCompleter;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
@@ -47,15 +56,6 @@
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.AtomicLongMap;
 
-import io.atomix.copycat.server.Commit;
-import io.atomix.copycat.server.Snapshottable;
-import io.atomix.copycat.server.StateMachineExecutor;
-import io.atomix.copycat.server.session.ServerSession;
-import io.atomix.copycat.server.session.SessionListener;
-import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
-import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
-import io.atomix.resource.ResourceStateMachine;
-
 /**
  * State machine for {@link AtomixWorkQueue} resource.
  */
@@ -82,6 +82,7 @@
         executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
         executor.register(Take.class, this::take);
         executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
+        executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
     }
 
     protected WorkQueueStats stats(Commit<? extends Stats> commit) {
@@ -96,6 +97,17 @@
         }
     }
 
+    protected void clear(Commit<? extends Clear> commit) {
+        unassignedTasks.forEach(TaskHolder::complete);
+        unassignedTasks.clear();
+        assignments.values().forEach(TaskAssignment::markComplete);
+        assignments.clear();
+        registeredWorkers.values().forEach(Commit::close);
+        registeredWorkers.clear();
+        activeTasksPerSession.clear();
+        totalCompleted.set(0);
+    }
+
     protected void register(Commit<? extends Register> commit) {
         long sessionId = commit.session().id();
         if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
@@ -172,7 +184,7 @@
         try {
             commit.operation().taskIds().forEach(taskId -> {
                 TaskAssignment assignment = assignments.get(taskId);
-                if (assignment != null) {
+                if (assignment != null && assignment.sessionId() == sessionId) {
                     assignments.remove(taskId).markComplete();
                     // bookkeeping
                     totalCompleted.incrementAndGet();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
index 161424d..7f1b36d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -15,6 +15,13 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.resource.ResourceType;
+
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.UUID;
@@ -23,10 +30,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import io.atomix.Atomix;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,10 +39,6 @@
 
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Unit tests for {@link AtomixWorkQueue}.
  */
@@ -186,4 +185,54 @@
 
         Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
     }
+
+    @Test
+    public void testDestroy() {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item = DEFAULT_PAYLOAD;
+        queue1.addOne(item).join();
+
+        Atomix atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] task2 = DEFAULT_PAYLOAD;
+        queue2.addOne(task2).join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 2);
+        assertEquals(stats.totalInProgress(), 0);
+        assertEquals(stats.totalCompleted(), 0);
+
+        queue2.destroy().join();
+
+        stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 0);
+        assertEquals(stats.totalInProgress(), 0);
+        assertEquals(stats.totalCompleted(), 0);
+    }
+
+    @Test
+    public void testCompleteAttemptWithIncorrectSession() {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item = DEFAULT_PAYLOAD;
+        queue1.addOne(item).join();
+
+        Task<byte[]> task = queue1.take().join();
+        String taskId = task.taskId();
+
+        // Create another client and get a handle to the same queue.
+        Atomix atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+
+        // Attempt completing the task with new client and verify task is not completed
+        queue2.complete(taskId).join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 0);
+        assertEquals(stats.totalInProgress(), 1);
+        assertEquals(stats.totalCompleted(), 0);
+    }
 }