Work queue improvements
- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive

Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
index 161424d..7f1b36d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -15,6 +15,13 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.resource.ResourceType;
+
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.UUID;
@@ -23,10 +30,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import io.atomix.Atomix;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,10 +39,6 @@
 
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Unit tests for {@link AtomixWorkQueue}.
  */
@@ -186,4 +185,54 @@
 
         Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
     }
+
+    @Test
+    public void testDestroy() {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item = DEFAULT_PAYLOAD;
+        queue1.addOne(item).join();
+
+        Atomix atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] task2 = DEFAULT_PAYLOAD;
+        queue2.addOne(task2).join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 2);
+        assertEquals(stats.totalInProgress(), 0);
+        assertEquals(stats.totalCompleted(), 0);
+
+        queue2.destroy().join();
+
+        stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 0);
+        assertEquals(stats.totalInProgress(), 0);
+        assertEquals(stats.totalCompleted(), 0);
+    }
+
+    @Test
+    public void testCompleteAttemptWithIncorrectSession() {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item = DEFAULT_PAYLOAD;
+        queue1.addOne(item).join();
+
+        Task<byte[]> task = queue1.take().join();
+        String taskId = task.taskId();
+
+        // Create another client and get a handle to the same queue.
+        Atomix atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+
+        // Attempt completing the task with new client and verify task is not completed
+        queue2.complete(taskId).join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 0);
+        assertEquals(stats.totalInProgress(), 1);
+        assertEquals(stats.totalCompleted(), 0);
+    }
 }