Distributed work queue primitive
Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 175e253..f07f838 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -33,6 +33,8 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
@@ -40,8 +42,11 @@
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.WorkQueueStats;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
@@ -81,15 +86,20 @@
serializer.register(MapTransaction.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
+ serializer.register(Task.class, factory);
+ serializer.register(WorkQueueStats.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
+ serializer.register(ImmutableList.of().getClass(), factory);
serializer.resolve(new LongCommands.TypeResolver());
serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
+ serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
serializer.resolve(new ResourceManagerTypeResolver());
serializer.registerClassLoader(AtomixConsistentMapFactory.class)
- .registerClassLoader(AtomixLeaderElectorFactory.class);
+ .registerClassLoader(AtomixLeaderElectorFactory.class)
+ .registerClassLoader(AtomixWorkQueueFactory.class);
return serializer;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
index 88d65d5..b0d6841 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
@@ -54,20 +54,19 @@
}
@Override
- public void write(T object, BufferOutput buffer,
- io.atomix.catalyst.serializer.Serializer serializer) {
+ public void write(T object, BufferOutput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
try {
byte[] payload = this.serializer.encode(object);
buffer.writeInt(payload.length);
buffer.write(payload);
} catch (Exception e) {
log.warn("Failed to serialize {}", object, e);
+ throw Throwables.propagate(e);
}
}
@Override
- public T read(Class<T> type, BufferInput buffer,
- io.atomix.catalyst.serializer.Serializer serializer) {
+ public T read(Class<T> type, BufferInput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
int size = buffer.readInt();
try {
byte[] payload = new byte[size];
@@ -75,8 +74,7 @@
return this.serializer.decode(payload);
} catch (Exception e) {
log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
- Throwables.propagate(e);
- return null;
+ throw Throwables.propagate(e);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
new file mode 100644
index 0000000..549cfc6
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -0,0 +1,67 @@
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.collect.Collections2;
+
+/**
+ * Default implementation of {@link WorkQueue}.
+ *
+ * @param <E> task payload type.
+ */
+public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
+
+ private final WorkQueue<byte[]> backingQueue;
+ private final Serializer serializer;
+
+ public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
+ this.backingQueue = backingQueue;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public CompletableFuture<Void> addMultiple(Collection<E> items) {
+ return backingQueue.addMultiple(items.stream()
+ .map(serializer::encode)
+ .collect(Collectors.toCollection(ArrayList::new)));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
+ return backingQueue.take(maxTasks)
+ .thenApply(tasks -> Collections2.transform(tasks, task -> task.<E>map(serializer::decode)));
+ }
+
+ @Override
+ public CompletableFuture<Void> complete(Collection<String> ids) {
+ return backingQueue.complete(ids);
+ }
+
+ @Override
+ public CompletableFuture<WorkQueueStats> stats() {
+ return backingQueue.stats();
+ }
+
+ @Override
+ public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
+ int parallelism,
+ Executor executor) {
+ Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
+ return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopProcessing() {
+ return backingQueue.stopProcessing();
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index e4b6f9a..a095de1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -30,6 +30,7 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import com.google.common.base.Charsets;
@@ -101,6 +102,11 @@
}
@Override
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ return getCreator(name).newWorkQueue(name, serializer);
+ }
+
+ @Override
public Set<String> getAsyncConsistentMapNames() {
return members.values()
.stream()
@@ -118,6 +124,15 @@
.orElse(ImmutableSet.of());
}
+ @Override
+ public Set<String> getWorkQueueNames() {
+ return members.values()
+ .stream()
+ .map(DistributedPrimitiveCreator::getWorkQueueNames)
+ .reduce(Sets::union)
+ .orElse(ImmutableSet.of());
+ }
+
/**
* Returns the {@code DistributedPrimitiveCreator} to use for hosting a primitive.
* @param name primitive name
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 4def1e9..8eb138a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -46,6 +46,7 @@
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
@@ -54,6 +55,7 @@
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
@@ -171,6 +173,12 @@
}
@Override
+ public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+ }
+
+ @Override
public List<MapInfo> getMapInfo() {
return listMapInfo(federatedPrimitiveCreator);
}
@@ -185,6 +193,18 @@
}
@Override
+ public Map<String, WorkQueueStats> getQueueStats() {
+ Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
+ federatedPrimitiveCreator.getWorkQueueNames()
+ .forEach(name -> workQueueStats.put(name,
+ federatedPrimitiveCreator.newWorkQueue(name,
+ Serializer.using(KryoNamespaces.BASIC))
+ .stats()
+ .join()));
+ return workQueueStats;
+ }
+
+ @Override
public List<PartitionInfo> getPartitionInfo() {
return partitionAdminService.partitionInfo();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 0c1d8a6..d635a79 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -41,6 +41,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -49,6 +50,7 @@
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
@@ -159,11 +161,16 @@
@Override
public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
- // TODO: Implement
throw new UnsupportedOperationException();
}
@Override
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
+ return new DefaultDistributedWorkQueue<>(workQueue, serializer);
+ }
+
+ @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache)
@@ -187,6 +194,11 @@
}
@Override
+ public Set<String> getWorkQueueNames() {
+ return client.keys(AtomixWorkQueue.class).join();
+ }
+
+ @Override
public boolean isOpen() {
return resourceClient.client().state() != State.CLOSED;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
new file mode 100644
index 0000000..7b4ad47
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableList;
+
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
+
+/**
+ * Distributed resource providing the {@link WorkQueue} primitive.
+ */
+@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
+public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
+ implements WorkQueue<byte[]> {
+
+ private final Logger log = getLogger(getClass());
+ public static final String TASK_AVAILABLE = "task-available";
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
+ private final Timer timer = new Timer("atomix-work-queue-completer");
+ private final AtomicBoolean isRegistered = new AtomicBoolean(false);
+
+ protected AtomixWorkQueue(CopycatClient client, Properties options) {
+ super(client, options);
+ }
+
+ @Override
+ public CompletableFuture<AtomixWorkQueue> open() {
+ return super.open().thenApply(result -> {
+ client.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
+ client.submit(new Register());
+ }
+ });
+ client.onEvent(TASK_AVAILABLE, this::resumeWork);
+ return result;
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
+ if (items.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return client.submit(new Add(items));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
+ if (maxTasks <= 0) {
+ return CompletableFuture.completedFuture(ImmutableList.of());
+ }
+ return client.submit(new Take(maxTasks));
+ }
+
+ @Override
+ public CompletableFuture<Void> complete(Collection<String> taskIds) {
+ if (taskIds.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return client.submit(new Complete(taskIds));
+ }
+
+ @Override
+ public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
+ int parallelism,
+ Executor executor) {
+ Accumulator<String> completedTaskAccumulator =
+ new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
+ taskProcessor.set(new TaskProcessor(callback,
+ parallelism,
+ executor,
+ completedTaskAccumulator));
+ return register().thenCompose(v -> take(parallelism))
+ .thenAccept(taskProcessor.get()::accept);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopProcessing() {
+ return unregister();
+ }
+
+ @Override
+ public CompletableFuture<WorkQueueStats> stats() {
+ return client.submit(new Stats());
+ }
+
+ private void resumeWork() {
+ TaskProcessor activeProcessor = taskProcessor.get();
+ if (activeProcessor == null) {
+ return;
+ }
+ this.take(activeProcessor.headRoom())
+ .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
+ }
+
+ private CompletableFuture<Void> register() {
+ return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
+ }
+
+ private CompletableFuture<Void> unregister() {
+ return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
+ }
+
+ // TaskId accumulator for paced triggering of task completion calls.
+ private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
+ CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
+ super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void processItems(List<String> items) {
+ complete(items);
+ }
+ }
+
+ private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
+
+ private final AtomicInteger headRoom;
+ private final Consumer<byte[]> backingConsumer;
+ private final Executor executor;
+ private final Accumulator<String> taskCompleter;
+
+ public TaskProcessor(Consumer<byte[]> backingConsumer,
+ int parallelism,
+ Executor executor,
+ Accumulator<String> taskCompleter) {
+ this.backingConsumer = backingConsumer;
+ this.headRoom = new AtomicInteger(parallelism);
+ this.executor = executor;
+ this.taskCompleter = taskCompleter;
+ }
+
+ public int headRoom() {
+ return headRoom.get();
+ }
+
+ @Override
+ public void accept(Collection<Task<byte[]>> tasks) {
+ if (tasks == null) {
+ return;
+ }
+ headRoom.addAndGet(-1 * tasks.size());
+ tasks.forEach(task ->
+ executor.execute(() -> {
+ try {
+ backingConsumer.accept(task.payload());
+ taskCompleter.add(task.taskId());
+ } catch (Exception e) {
+ log.debug("Task execution failed", e);
+ } finally {
+ headRoom.incrementAndGet();
+ resumeWork();
+ }
+ }));
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
new file mode 100644
index 0000000..3724529
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.base.MoreObjects;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+
+/**
+ * {@link AtomixWorkQueue} resource state machine operations.
+ */
+public final class AtomixWorkQueueCommands {
+
+ private AtomixWorkQueueCommands() {
+ }
+
+ /**
+ * Command to add a collection of tasks to the queue.
+ */
+ @SuppressWarnings("serial")
+ public static class Add implements Command<Void>, CatalystSerializable {
+
+ private Collection<byte[]> items;
+
+ private Add() {
+ }
+
+ public Add(Collection<byte[]> items) {
+ this.items = items;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ buffer.writeInt(items.size());
+ items.forEach(task -> serializer.writeObject(task, buffer));
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ items = IntStream.range(0, buffer.readInt())
+ .mapToObj(i -> serializer.<byte[]>readObject(buffer))
+ .collect(Collectors.toCollection(ArrayList::new));
+ }
+
+ public Collection<byte[]> items() {
+ return items;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("items", items)
+ .toString();
+ }
+ }
+
+ /**
+ * Command to take a task from the queue.
+ */
+ @SuppressWarnings("serial")
+ public static class Take implements Command<Collection<Task<byte[]>>>, CatalystSerializable {
+
+ private int maxTasks;
+
+ private Take() {
+ }
+
+ public Take(int maxTasks) {
+ this.maxTasks = maxTasks;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ buffer.writeInt(maxTasks);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ maxTasks = buffer.readInt();
+ }
+
+ public int maxTasks() {
+ return maxTasks;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("maxTasks", maxTasks)
+ .toString();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class Stats implements Command<WorkQueueStats>, CatalystSerializable {
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+
+
+
+ @SuppressWarnings("serial")
+ public static class Register implements Command<Void>, CatalystSerializable {
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class Unregister implements Command<Void>, CatalystSerializable {
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class Complete implements Command<Void>, CatalystSerializable {
+ private Collection<String> taskIds;
+
+ private Complete() {
+ }
+
+ public Complete(Collection<String> taskIds) {
+ this.taskIds = taskIds;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ serializer.writeObject(taskIds, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ taskIds = serializer.readObject(buffer);
+ }
+
+ public Collection<String> taskIds() {
+ return taskIds;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("taskIds", taskIds)
+ .toString();
+ }
+ }
+
+ /**
+ * Work queue command type resolver.
+ */
+ public static class TypeResolver implements SerializableTypeResolver {
+ @Override
+ public void resolve(SerializerRegistry registry) {
+ registry.register(Register.class, -960);
+ registry.register(Unregister.class, -961);
+ registry.register(Take.class, -962);
+ registry.register(Add.class, -963);
+ registry.register(Complete.class, -964);
+ registry.register(Stats.class, -965);
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueFactory.java
new file mode 100644
index 0000000..0c61b2e
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.ResourceFactory;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Properties;
+
+/**
+ * {@link AtomixWorkQueue} resource factory.
+ */
+public class AtomixWorkQueueFactory implements ResourceFactory<AtomixWorkQueue> {
+
+ @Override
+ public SerializableTypeResolver createSerializableTypeResolver() {
+ return new AtomixWorkQueueCommands.TypeResolver();
+ }
+
+ @Override
+ public ResourceStateMachine createStateMachine(Properties config) {
+ return new AtomixWorkQueueState(config);
+ }
+
+ @Override
+ public AtomixWorkQueue createInstance(CopycatClient client, Properties properties) {
+ return new AtomixWorkQueue(client, properties);
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
new file mode 100644
index 0000000..d287e19
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.onlab.util.CountDownCompleter;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.AtomicLongMap;
+
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+/**
+ * State machine for {@link AtomixWorkQueue} resource.
+ */
+public class AtomixWorkQueueState extends ResourceStateMachine implements SessionListener, Snapshottable {
+
+ private final Logger log = getLogger(getClass());
+
+ private final AtomicLong totalCompleted = new AtomicLong(0);
+
+ private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque();
+ private final Map<String, TaskAssignment> assignments = Maps.newHashMap();
+ private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap();
+ private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create();
+
+ protected AtomixWorkQueueState(Properties config) {
+ super(config);
+ }
+
+ @Override
+ protected void configure(StateMachineExecutor executor) {
+ executor.register(Stats.class, this::stats);
+ executor.register(Register.class, (Consumer<Commit<Register>>) this::register);
+ executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister);
+ executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
+ executor.register(Take.class, this::take);
+ executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
+ }
+
+ protected WorkQueueStats stats(Commit<? extends Stats> commit) {
+ try {
+ return WorkQueueStats.builder()
+ .withTotalCompleted(totalCompleted.get())
+ .withTotalPending(unassignedTasks.size())
+ .withTotalInProgress(assignments.size())
+ .build();
+ } finally {
+ commit.close();
+ }
+ }
+
+ protected void register(Commit<? extends Register> commit) {
+ long sessionId = commit.session().id();
+ if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
+ commit.close();
+ }
+ }
+
+ protected void unregister(Commit<? extends Unregister> commit) {
+ try {
+ Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id());
+ if (registerCommit != null) {
+ registerCommit.close();
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ protected void add(Commit<? extends Add> commit) {
+ Collection<byte[]> items = commit.operation().items();
+
+ // Create a CountDownCompleter that will close the commit when all tasks
+ // submitted as part of it are completed.
+ CountDownCompleter<Commit<? extends Add>> referenceTracker =
+ new CountDownCompleter<>(commit, items.size(), Commit::close);
+
+ AtomicInteger itemIndex = new AtomicInteger(0);
+ items.forEach(item -> {
+ String taskId = String.format("%d:%d:%d", commit.session().id(),
+ commit.index(),
+ itemIndex.getAndIncrement());
+ unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker));
+ });
+
+ // Send an event to all sessions that have expressed interest in task processing
+ // and are not actively processing a task.
+ registeredWorkers.values()
+ .stream()
+ .map(Commit::session)
+ .forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE));
+ // FIXME: This generates a lot of event traffic.
+ }
+
+ protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
+ try {
+ if (unassignedTasks.isEmpty()) {
+ return ImmutableList.of();
+ }
+ long sessionId = commit.session().id();
+ int maxTasks = commit.operation().maxTasks();
+ return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
+ .mapToObj(i -> {
+ TaskHolder holder = unassignedTasks.poll();
+ String taskId = holder.task().taskId();
+ TaskAssignment assignment = new TaskAssignment(sessionId, holder);
+
+ // bookkeeping
+ assignments.put(taskId, assignment);
+ activeTasksPerSession.incrementAndGet(sessionId);
+
+ return holder.task();
+ })
+ .collect(Collectors.toCollection(ArrayList::new));
+ } catch (Exception e) {
+ log.warn("State machine update failed", e);
+ throw Throwables.propagate(e);
+ } finally {
+ commit.close();
+ }
+ }
+
+ protected void complete(Commit<? extends Complete> commit) {
+ long sessionId = commit.session().id();
+ try {
+ commit.operation().taskIds().forEach(taskId -> {
+ TaskAssignment assignment = assignments.get(taskId);
+ if (assignment != null) {
+ assignments.remove(taskId).markComplete();
+ // bookkeeping
+ totalCompleted.incrementAndGet();
+ activeTasksPerSession.decrementAndGet(sessionId);
+ }
+ });
+ } catch (Exception e) {
+ log.warn("State machine update failed", e);
+ throw Throwables.propagate(e);
+ } finally {
+ commit.close();
+ }
+ }
+
+ @Override
+ public void register(ServerSession session) {
+ }
+
+ @Override
+ public void unregister(ServerSession session) {
+ evictWorker(session.id());
+ }
+
+ @Override
+ public void expire(ServerSession session) {
+ evictWorker(session.id());
+ }
+
+ @Override
+ public void close(ServerSession session) {
+ evictWorker(session.id());
+ }
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeLong(totalCompleted.get());
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ totalCompleted.set(reader.readLong());
+ }
+
+ private void evictWorker(long sessionId) {
+ Commit<? extends Register> commit = registeredWorkers.remove(sessionId);
+ if (commit != null) {
+ commit.close();
+ }
+
+ // TODO: Maintain an index of tasks by session for efficient access.
+ Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, TaskAssignment> entry = iter.next();
+ TaskAssignment assignment = entry.getValue();
+ if (assignment.sessionId() == sessionId) {
+ unassignedTasks.add(assignment.taskHolder());
+ iter.remove();
+ }
+ }
+
+ // Bookkeeping
+ activeTasksPerSession.remove(sessionId);
+ activeTasksPerSession.removeAllZeros();
+ }
+
+ private class TaskHolder {
+
+ private final Task<byte[]> task;
+ private final CountDownCompleter<Commit<? extends Add>> referenceTracker;
+
+ public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) {
+ this.task = delegate;
+ this.referenceTracker = referenceTracker;
+ }
+
+ public Task<byte[]> task() {
+ return task;
+ }
+
+ public void complete() {
+ referenceTracker.countDown();
+ }
+ }
+
+ private class TaskAssignment {
+ private final long sessionId;
+ private final TaskHolder taskHolder;
+
+ public TaskAssignment(long sessionId, TaskHolder taskHolder) {
+ this.sessionId = sessionId;
+ this.taskHolder = taskHolder;
+ }
+
+ public long sessionId() {
+ return sessionId;
+ }
+
+ public TaskHolder taskHolder() {
+ return taskHolder;
+ }
+
+ public void markComplete() {
+ taskHolder.complete();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("sessionId", sessionId)
+ .add("taskHolder", taskHolder)
+ .toString();
+ }
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index 7dfe82f..f7a5007 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.resources.impl;
import com.google.common.util.concurrent.Uninterruptibles;
+
import io.atomix.AtomixClient;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
@@ -114,18 +115,18 @@
CompletableFuture<Void> closeClients =
CompletableFuture.allOf(atomixClients.stream()
- .map(AtomixClient::close)
- .toArray(CompletableFuture[]::new));
+ .map(AtomixClient::close)
+ .toArray(CompletableFuture[]::new));
+ closeClients.join();
- closeClients
- .thenCompose(v -> CompletableFuture
- .allOf(copycatServers.stream()
- .map(CopycatServer::shutdown)
- .toArray(CompletableFuture[]::new))).join();
+ CompletableFuture<Void> closeServers =
+ CompletableFuture.allOf(copycatServers.stream()
+ .map(CopycatServer::shutdown)
+ .toArray(CompletableFuture[]::new));
+ closeServers.join();
- atomixClients = new ArrayList<>();
-
- copycatServers = new ArrayList<>();
+ atomixClients.clear();
+ copycatServers.clear();
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
new file mode 100644
index 0000000..161424d
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.resource.ResourceType;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link AtomixWorkQueue}.
+ */
+public class AtomixWorkQueueTest extends AtomixTestBase {
+
+ private static final Duration DEFAULT_PROCESSING_TIME = Duration.ofMillis(100);
+ private static final byte[] DEFAULT_PAYLOAD = "hello world".getBytes();
+
+ @BeforeClass
+ public static void preTestSetup() throws Throwable {
+ createCopycatServers(1);
+ }
+
+ @AfterClass
+ public static void postTestCleanup() throws Exception {
+ clearTests();
+ }
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(AtomixWorkQueue.class);
+ }
+
+ @Test
+ public void testAdd() throws Throwable {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item = DEFAULT_PAYLOAD;
+ queue1.addOne(item).join();
+
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] task2 = DEFAULT_PAYLOAD;
+ queue2.addOne(task2).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 2);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+ }
+
+ @Test
+ public void testAddMultiple() throws Throwable {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item1 = DEFAULT_PAYLOAD;
+ byte[] item2 = DEFAULT_PAYLOAD;
+ queue1.addMultiple(Arrays.asList(item1, item2)).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 2);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 0);
+ }
+
+ @Test
+ public void testTakeAndComplete() throws Throwable {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item1 = DEFAULT_PAYLOAD;
+ queue1.addOne(item1).join();
+
+ Atomix atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ Task<byte[]> removedTask = queue2.take().join();
+
+ WorkQueueStats stats = queue2.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 1);
+ assertEquals(stats.totalCompleted(), 0);
+
+ assertTrue(Arrays.equals(removedTask.payload(), item1));
+ queue2.complete(Arrays.asList(removedTask.taskId())).join();
+
+ stats = queue1.stats().join();
+ assertEquals(stats.totalPending(), 0);
+ assertEquals(stats.totalInProgress(), 0);
+ assertEquals(stats.totalCompleted(), 1);
+
+ // Another take should return null
+ assertNull(queue2.take().join());
+ }
+
+ @Test
+ public void testUnexpectedClientClose() throws Throwable {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item1 = DEFAULT_PAYLOAD;
+ queue1.addOne(item1).join();
+
+ AtomixClient atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ queue2.take().join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(0, stats.totalPending());
+ assertEquals(1, stats.totalInProgress());
+ assertEquals(0, stats.totalCompleted());
+
+ atomix2.close().join();
+
+ stats = queue1.stats().join();
+ assertEquals(1, stats.totalPending());
+ assertEquals(0, stats.totalInProgress());
+ assertEquals(0, stats.totalCompleted());
+ }
+
+ @Test
+ public void testAutomaticTaskProcessing() throws Throwable {
+ String queueName = UUID.randomUUID().toString();
+ Atomix atomix1 = createAtomixClient();
+ AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+ Executor executor = Executors.newSingleThreadExecutor();
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ queue1.registerTaskProcessor(s -> latch1.countDown(), 2, executor);
+
+ AtomixClient atomix2 = createAtomixClient();
+ AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+ byte[] item1 = DEFAULT_PAYLOAD;
+ queue2.addOne(item1).join();
+
+ Uninterruptibles.awaitUninterruptibly(latch1, 500, TimeUnit.MILLISECONDS);
+ queue1.stopProcessing();
+
+ byte[] item2 = DEFAULT_PAYLOAD;
+ byte[] item3 = DEFAULT_PAYLOAD;
+
+ Tools.delay((int) DEFAULT_PROCESSING_TIME.toMillis());
+
+ queue2.addMultiple(Arrays.asList(item2, item3)).join();
+
+ WorkQueueStats stats = queue1.stats().join();
+ assertEquals(2, stats.totalPending());
+ assertEquals(0, stats.totalInProgress());
+ assertEquals(1, stats.totalCompleted());
+
+ CountDownLatch latch2 = new CountDownLatch(2);
+ queue1.registerTaskProcessor(s -> latch2.countDown(), 2, executor);
+
+ Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
+ }
+}