Work queue improvements
- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive
Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
index 161424d..7f1b36d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -15,6 +15,13 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.resource.ResourceType;
+
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
@@ -23,10 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import io.atomix.Atomix;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,10 +39,6 @@
import com.google.common.util.concurrent.Uninterruptibles;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
/**
* Unit tests for {@link AtomixWorkQueue}.
*/
@@ -186,4 +185,54 @@
Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void testDestroy() {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item = DEFAULT_PAYLOAD;
+ queue1.addOne(item).join();
+
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] task2 = DEFAULT_PAYLOAD;
+ queue2.addOne(task2).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 2);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+
+ queue2.destroy().join();
+
+ stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+ }
+
+ @Test
+ public void testCompleteAttemptWithIncorrectSession() {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item = DEFAULT_PAYLOAD;
+ queue1.addOne(item).join();
+
+ Task<byte[]> task = queue1.take().join();
+ String taskId = task.taskId();
+
+ // Create another client and get a handle to the same queue.
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+
+ // Attempt completing the task with new client and verify task is not completed
+ queue2.complete(taskId).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 1);
+ assertEquals(stats.totalCompleted(), 0);
+ }
}