Work queue improvements
- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive
Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
diff --git a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
index b8ad5d0..fa64b5d 100644
--- a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
+++ b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
@@ -19,7 +19,6 @@
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
-import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
@@ -47,11 +46,6 @@
}
@Override
- public <E> DistributedQueueBuilder<E> queueBuilder() {
- return null;
- }
-
- @Override
public AtomicCounterBuilder atomicCounterBuilder() {
return null;
}
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index 2dfcd2c..768aa0b 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -22,7 +22,6 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
@@ -61,16 +60,6 @@
<V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer);
/**
- * Creates a new {@code DistributedQueue}.
- *
- * @param name queue name
- * @param serializer serializer to use for serializing/deserializing queue entries
- * @param <E> queue entry type
- * @return queue
- */
- <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer);
-
- /**
* Creates a new {@code AsyncDistributedSet}.
*
* @param name set name
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
index b3328e5..c8949c8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -62,9 +62,9 @@
VALUE,
/**
- * Distributed queue.
+ * Distributed work queue.
*/
- QUEUE,
+ WORK_QUEUE,
/**
* Leader elector.
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java b/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
deleted file mode 100644
index 7266f29..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A distributed collection designed for holding elements prior to processing.
- * A queue provides insertion, extraction and inspection operations. The extraction operation
- * is designed to be non-blocking.
- *
- * @param <E> queue entry type
- */
-public interface DistributedQueue<E> extends DistributedPrimitive {
-
- /**
- * Returns total number of entries in the queue.
- * @return queue size
- */
- long size();
-
- /**
- * Returns true if queue has elements in it.
- * @return true is queue has elements, false otherwise
- */
- default boolean isEmpty() {
- return size() == 0;
- }
-
- /**
- * Inserts an entry into the queue.
- * @param entry entry to insert
- */
- void push(E entry);
-
- /**
- * If the queue is non-empty, an entry will be removed from the queue and the returned future
- * will be immediately completed with it. If queue is empty when this call is made, the returned
- * future will be eventually completed when an entry is added to the queue.
- * @return queue entry
- */
- CompletableFuture<E> pop();
-
- /**
- * Returns an entry from the queue without removing it. If the queue is empty returns null.
- * @return queue entry or null if queue is empty
- */
- E peek();
-}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java
deleted file mode 100644
index 1f32f5e..0000000
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.service;
-
-/**
- * Builder for distributed queue.
- *
- * @param <E> type queue elements.
- */
-public interface DistributedQueueBuilder<E> {
-
- /**
- * Sets the name of the queue.
- * <p>
- * Each queue is identified by a unique name.
- * </p>
- * <p>
- * Note: This is a mandatory parameter.
- * </p>
- *
- * @param name name of the queue
- * @return this DistributedQueueBuilder for method chaining
- */
- DistributedQueueBuilder<E> withName(String name);
-
- /**
- * Sets a serializer that can be used to serialize
- * the elements pushed into the queue. The serializer
- * builder should be pre-populated with any classes that will be
- * put into the queue.
- * <p>
- * Note: This is a mandatory parameter.
- * </p>
- *
- * @param serializer serializer
- * @return this DistributedQueueBuilder for method chaining
- */
- DistributedQueueBuilder<E> withSerializer(Serializer serializer);
-
- /**
- * Builds a queue based on the configuration options
- * supplied to this builder.
- *
- * @return new distributed queue
- * @throws java.lang.RuntimeException if a mandatory parameter is missing
- */
- DistributedQueue<E> build();
-}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index f342bdd..53830c2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -52,14 +52,6 @@
<E> DistributedSetBuilder<E> setBuilder();
/**
- * Creates a new DistributedQueueBuilder.
- *
- * @param <E> queue entry type
- * @return builder for an distributed queue
- */
- <E> DistributedQueueBuilder<E> queueBuilder();
-
- /**
* Creates a new AtomicCounterBuilder.
*
* @return atomic counter builder
diff --git a/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
index f9b0173..99eaa03 100644
--- a/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
@@ -39,7 +39,12 @@
*
* @param <E> task payload type.
*/
-public interface WorkQueue<E> {
+public interface WorkQueue<E> extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type primitiveType() {
+ return DistributedPrimitive.Type.WORK_QUEUE;
+ }
/**
* Adds a collection of tasks to the work queue.
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index 18ea539..3978614 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -35,11 +35,6 @@
}
@Override
- public <E> DistributedQueueBuilder<E> queueBuilder() {
- return null;
- }
-
- @Override
public AtomicCounterBuilder atomicCounterBuilder() {
return null;
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java b/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
index 95e775b..d8dff29 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
@@ -34,11 +34,6 @@
}
@Override
- public <E> DistributedQueueBuilder<E> queueBuilder() {
- throw new UnsupportedOperationException("queueBuilder");
- }
-
- @Override
public AtomicCounterBuilder atomicCounterBuilder() {
return TestAtomicCounter.builder();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java
deleted file mode 100644
index 7e05367..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.DistributedQueueBuilder;
-import org.onosproject.store.service.Serializer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Default implementation of a {@code DistributedQueueBuilder}.
- *
- * @param <E> queue entry type
- */
-public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
-
- private final DistributedPrimitiveCreator primitiveCreator;
- private String name;
- private Serializer serializer;
-
- public DefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) {
- this.primitiveCreator = primitiveCreator;
- }
-
- @Override
- public DistributedQueueBuilder<E> withName(String name) {
- checkArgument(name != null && !name.isEmpty());
- this.name = name;
- return this;
- }
-
- @Override
- public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
- checkArgument(serializer != null);
- this.serializer = serializer;
- return this;
- }
-
- private boolean validInputs() {
- return name != null && serializer != null;
- }
-
- @Override
- public DistributedQueue<E> build() {
- checkState(validInputs());
- return primitiveCreator.newDistributedQueue(name, serializer);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
index 2f604d3..9ce11bf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -30,6 +30,11 @@
}
@Override
+ public String name() {
+ return backingQueue.name();
+ }
+
+ @Override
public CompletableFuture<Void> addMultiple(Collection<E> items) {
return backingQueue.addMultiple(items.stream()
.map(serializer::encode)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index a095de1..4dc2bc4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -29,9 +29,8 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
@@ -84,11 +83,6 @@
}
@Override
- public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
- return getCreator(name).newDistributedQueue(name, serializer);
- }
-
- @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 8eb138a..f56b6e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.store.primitives.impl;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
@@ -44,9 +46,7 @@
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
@@ -55,15 +55,13 @@
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.*;
-
/**
* Implementation for {@code StorageService} and {@code StorageAdminService}.
*/
@@ -137,12 +135,6 @@
}
@Override
- public <E> DistributedQueueBuilder<E> queueBuilder() {
- checkPermission(STORAGE_WRITE);
- return new DefaultDistributedQueueBuilder<>(federatedPrimitiveCreator);
- }
-
- @Override
public AtomicCounterBuilder atomicCounterBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index d635a79..205656e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -49,10 +49,9 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
import com.google.common.base.Supplier;
@@ -160,11 +159,6 @@
}
@Override
- public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
return new DefaultDistributedWorkQueue<>(workQueue, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
index 879cbb3..0085932 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -18,6 +18,9 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.List;
@@ -34,22 +37,19 @@
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
/**
* Distributed resource providing the {@link WorkQueue} primitive.
*/
@@ -69,6 +69,18 @@
}
@Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ executor.shutdown();
+ timer.cancel();
+ return client.submit(new Clear());
+ }
+
+ @Override
public CompletableFuture<AtomixWorkQueue> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
index 3724529..2e77da9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
@@ -15,6 +15,14 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
@@ -25,14 +33,6 @@
import com.google.common.base.MoreObjects;
-import io.atomix.catalyst.buffer.BufferInput;
-import io.atomix.catalyst.buffer.BufferOutput;
-import io.atomix.catalyst.serializer.CatalystSerializable;
-import io.atomix.catalyst.serializer.SerializableTypeResolver;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.serializer.SerializerRegistry;
-import io.atomix.copycat.Command;
-
/**
* {@link AtomixWorkQueue} resource state machine operations.
*/
@@ -207,6 +207,24 @@
}
}
+ @SuppressWarnings("serial")
+ public static class Clear implements Command<Void>, CatalystSerializable {
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+
/**
* Work queue command type resolver.
*/
@@ -219,6 +237,7 @@
registry.register(Add.class, -963);
registry.register(Complete.class, -964);
registry.register(Stats.class, -965);
+ registry.register(Clear.class, -966);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
index d287e19..b226860 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -16,6 +16,14 @@
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,6 +39,7 @@
import org.onlab.util.CountDownCompleter;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
@@ -47,15 +56,6 @@
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AtomicLongMap;
-import io.atomix.copycat.server.Commit;
-import io.atomix.copycat.server.Snapshottable;
-import io.atomix.copycat.server.StateMachineExecutor;
-import io.atomix.copycat.server.session.ServerSession;
-import io.atomix.copycat.server.session.SessionListener;
-import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
-import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
-import io.atomix.resource.ResourceStateMachine;
-
/**
* State machine for {@link AtomixWorkQueue} resource.
*/
@@ -82,6 +82,7 @@
executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
executor.register(Take.class, this::take);
executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
+ executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
}
protected WorkQueueStats stats(Commit<? extends Stats> commit) {
@@ -96,6 +97,17 @@
}
}
+ protected void clear(Commit<? extends Clear> commit) {
+ unassignedTasks.forEach(TaskHolder::complete);
+ unassignedTasks.clear();
+ assignments.values().forEach(TaskAssignment::markComplete);
+ assignments.clear();
+ registeredWorkers.values().forEach(Commit::close);
+ registeredWorkers.clear();
+ activeTasksPerSession.clear();
+ totalCompleted.set(0);
+ }
+
protected void register(Commit<? extends Register> commit) {
long sessionId = commit.session().id();
if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
@@ -172,7 +184,7 @@
try {
commit.operation().taskIds().forEach(taskId -> {
TaskAssignment assignment = assignments.get(taskId);
- if (assignment != null) {
+ if (assignment != null && assignment.sessionId() == sessionId) {
assignments.remove(taskId).markComplete();
// bookkeeping
totalCompleted.incrementAndGet();
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
index 161424d..7f1b36d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -15,6 +15,13 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.resource.ResourceType;
+
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
@@ -23,10 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import io.atomix.Atomix;
-import io.atomix.AtomixClient;
-import io.atomix.resource.ResourceType;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,10 +39,6 @@
import com.google.common.util.concurrent.Uninterruptibles;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
/**
* Unit tests for {@link AtomixWorkQueue}.
*/
@@ -186,4 +185,54 @@
Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void testDestroy() {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item = DEFAULT_PAYLOAD;
+ queue1.addOne(item).join();
+
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] task2 = DEFAULT_PAYLOAD;
+ queue2.addOne(task2).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 2);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+
+ queue2.destroy().join();
+
+ stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+ }
+
+ @Test
+ public void testCompleteAttemptWithIncorrectSession() {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item = DEFAULT_PAYLOAD;
+ queue1.addOne(item).join();
+
+ Task<byte[]> task = queue1.take().join();
+ String taskId = task.taskId();
+
+ // Create another client and get a handle to the same queue.
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+
+ // Attempt completing the task with new client and verify task is not completed
+ queue2.complete(taskId).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 1);
+ assertEquals(stats.totalCompleted(), 0);
+ }
}