Work queue improvements
- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive
Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java
deleted file mode 100644
index 7e05367..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.DistributedQueueBuilder;
-import org.onosproject.store.service.Serializer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Default implementation of a {@code DistributedQueueBuilder}.
- *
- * @param <E> queue entry type
- */
-public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
-
- private final DistributedPrimitiveCreator primitiveCreator;
- private String name;
- private Serializer serializer;
-
- public DefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) {
- this.primitiveCreator = primitiveCreator;
- }
-
- @Override
- public DistributedQueueBuilder<E> withName(String name) {
- checkArgument(name != null && !name.isEmpty());
- this.name = name;
- return this;
- }
-
- @Override
- public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
- checkArgument(serializer != null);
- this.serializer = serializer;
- return this;
- }
-
- private boolean validInputs() {
- return name != null && serializer != null;
- }
-
- @Override
- public DistributedQueue<E> build() {
- checkState(validInputs());
- return primitiveCreator.newDistributedQueue(name, serializer);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
index 2f604d3..9ce11bf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -30,6 +30,11 @@
}
@Override
+ public String name() {
+ return backingQueue.name();
+ }
+
+ @Override
public CompletableFuture<Void> addMultiple(Collection<E> items) {
return backingQueue.addMultiple(items.stream()
.map(serializer::encode)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index a095de1..4dc2bc4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -29,9 +29,8 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
@@ -84,11 +83,6 @@
}
@Override
- public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
- return getCreator(name).newDistributedQueue(name, serializer);
- }
-
- @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 8eb138a..f56b6e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.store.primitives.impl;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
@@ -44,9 +46,7 @@
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
@@ -55,15 +55,13 @@
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.*;
-
/**
* Implementation for {@code StorageService} and {@code StorageAdminService}.
*/
@@ -137,12 +135,6 @@
}
@Override
- public <E> DistributedQueueBuilder<E> queueBuilder() {
- checkPermission(STORAGE_WRITE);
- return new DefaultDistributedQueueBuilder<>(federatedPrimitiveCreator);
- }
-
- @Override
public AtomicCounterBuilder atomicCounterBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index d635a79..205656e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -49,10 +49,9 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
import com.google.common.base.Supplier;
@@ -160,11 +159,6 @@
}
@Override
- public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
return new DefaultDistributedWorkQueue<>(workQueue, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
index 879cbb3..0085932 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -18,6 +18,9 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.List;
@@ -34,22 +37,19 @@
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
/**
* Distributed resource providing the {@link WorkQueue} primitive.
*/
@@ -69,6 +69,18 @@
}
@Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ executor.shutdown();
+ timer.cancel();
+ return client.submit(new Clear());
+ }
+
+ @Override
public CompletableFuture<AtomixWorkQueue> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
index 3724529..2e77da9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
@@ -15,6 +15,14 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
@@ -25,14 +33,6 @@
import com.google.common.base.MoreObjects;
-import io.atomix.catalyst.buffer.BufferInput;
-import io.atomix.catalyst.buffer.BufferOutput;
-import io.atomix.catalyst.serializer.CatalystSerializable;
-import io.atomix.catalyst.serializer.SerializableTypeResolver;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.serializer.SerializerRegistry;
-import io.atomix.copycat.Command;
-
/**
* {@link AtomixWorkQueue} resource state machine operations.
*/
@@ -207,6 +207,24 @@
}
}
+ @SuppressWarnings("serial")
+ public static class Clear implements Command<Void>, CatalystSerializable {
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+
/**
* Work queue command type resolver.
*/
@@ -219,6 +237,7 @@
registry.register(Add.class, -963);
registry.register(Complete.class, -964);
registry.register(Stats.class, -965);
+ registry.register(Clear.class, -966);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
index d287e19..b226860 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -16,6 +16,14 @@
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,6 +39,7 @@
import org.onlab.util.CountDownCompleter;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
@@ -47,15 +56,6 @@
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AtomicLongMap;
-import io.atomix.copycat.server.Commit;
-import io.atomix.copycat.server.Snapshottable;
-import io.atomix.copycat.server.StateMachineExecutor;
-import io.atomix.copycat.server.session.ServerSession;
-import io.atomix.copycat.server.session.SessionListener;
-import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
-import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
-import io.atomix.resource.ResourceStateMachine;
-
/**
* State machine for {@link AtomixWorkQueue} resource.
*/
@@ -82,6 +82,7 @@
executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
executor.register(Take.class, this::take);
executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
+ executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
}
protected WorkQueueStats stats(Commit<? extends Stats> commit) {
@@ -96,6 +97,17 @@
}
}
+ protected void clear(Commit<? extends Clear> commit) {
+ unassignedTasks.forEach(TaskHolder::complete);
+ unassignedTasks.clear();
+ assignments.values().forEach(TaskAssignment::markComplete);
+ assignments.clear();
+ registeredWorkers.values().forEach(Commit::close);
+ registeredWorkers.clear();
+ activeTasksPerSession.clear();
+ totalCompleted.set(0);
+ }
+
protected void register(Commit<? extends Register> commit) {
long sessionId = commit.session().id();
if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
@@ -172,7 +184,7 @@
try {
commit.operation().taskIds().forEach(taskId -> {
TaskAssignment assignment = assignments.get(taskId);
- if (assignment != null) {
+ if (assignment != null && assignment.sessionId() == sessionId) {
assignments.remove(taskId).markComplete();
// bookkeeping
totalCompleted.incrementAndGet();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
index 161424d..7f1b36d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -15,6 +15,13 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.resource.ResourceType;
+
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
@@ -23,10 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import io.atomix.Atomix;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,10 +39,6 @@
import com.google.common.util.concurrent.Uninterruptibles;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
/**
* Unit tests for {@link AtomixWorkQueue}.
*/
@@ -186,4 +185,54 @@
Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void testDestroy() {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item = DEFAULT_PAYLOAD;
+ queue1.addOne(item).join();
+
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] task2 = DEFAULT_PAYLOAD;
+ queue2.addOne(task2).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 2);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+
+ queue2.destroy().join();
+
+ stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+ }
+
+ @Test
+ public void testCompleteAttemptWithIncorrectSession() {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item = DEFAULT_PAYLOAD;
+ queue1.addOne(item).join();
+
+ Task<byte[]> task = queue1.take().join();
+ String taskId = task.taskId();
+
+ // Create another client and get a handle to the same queue.
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+
+ // Attempt completing the task with new client and verify task is not completed
+ queue2.complete(taskId).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 1);
+ assertEquals(stats.totalCompleted(), 0);
+ }
}