Distributed work queue primitive
Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
new file mode 100644
index 0000000..7b4ad47
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableList;
+
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
+
+/**
+ * Distributed resource providing the {@link WorkQueue} primitive.
+ */
+@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
+public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
+ implements WorkQueue<byte[]> {
+
+ private final Logger log = getLogger(getClass());
+ public static final String TASK_AVAILABLE = "task-available";
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
+ private final Timer timer = new Timer("atomix-work-queue-completer");
+ private final AtomicBoolean isRegistered = new AtomicBoolean(false);
+
+ protected AtomixWorkQueue(CopycatClient client, Properties options) {
+ super(client, options);
+ }
+
+ @Override
+ public CompletableFuture<AtomixWorkQueue> open() {
+ return super.open().thenApply(result -> {
+ client.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
+ client.submit(new Register());
+ }
+ });
+ client.onEvent(TASK_AVAILABLE, this::resumeWork);
+ return result;
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
+ if (items.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return client.submit(new Add(items));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
+ if (maxTasks <= 0) {
+ return CompletableFuture.completedFuture(ImmutableList.of());
+ }
+ return client.submit(new Take(maxTasks));
+ }
+
+ @Override
+ public CompletableFuture<Void> complete(Collection<String> taskIds) {
+ if (taskIds.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return client.submit(new Complete(taskIds));
+ }
+
+ @Override
+ public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
+ int parallelism,
+ Executor executor) {
+ Accumulator<String> completedTaskAccumulator =
+ new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
+ taskProcessor.set(new TaskProcessor(callback,
+ parallelism,
+ executor,
+ completedTaskAccumulator));
+ return register().thenCompose(v -> take(parallelism))
+ .thenAccept(taskProcessor.get()::accept);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopProcessing() {
+ return unregister();
+ }
+
+ @Override
+ public CompletableFuture<WorkQueueStats> stats() {
+ return client.submit(new Stats());
+ }
+
+ private void resumeWork() {
+ TaskProcessor activeProcessor = taskProcessor.get();
+ if (activeProcessor == null) {
+ return;
+ }
+ this.take(activeProcessor.headRoom())
+ .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
+ }
+
+ private CompletableFuture<Void> register() {
+ return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
+ }
+
+ private CompletableFuture<Void> unregister() {
+ return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
+ }
+
+ // TaskId accumulator for paced triggering of task completion calls.
+ private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
+ CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
+ super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void processItems(List<String> items) {
+ complete(items);
+ }
+ }
+
+ private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
+
+ private final AtomicInteger headRoom;
+ private final Consumer<byte[]> backingConsumer;
+ private final Executor executor;
+ private final Accumulator<String> taskCompleter;
+
+ public TaskProcessor(Consumer<byte[]> backingConsumer,
+ int parallelism,
+ Executor executor,
+ Accumulator<String> taskCompleter) {
+ this.backingConsumer = backingConsumer;
+ this.headRoom = new AtomicInteger(parallelism);
+ this.executor = executor;
+ this.taskCompleter = taskCompleter;
+ }
+
+ public int headRoom() {
+ return headRoom.get();
+ }
+
+ @Override
+ public void accept(Collection<Task<byte[]>> tasks) {
+ if (tasks == null) {
+ return;
+ }
+ headRoom.addAndGet(-1 * tasks.size());
+ tasks.forEach(task ->
+ executor.execute(() -> {
+ try {
+ backingConsumer.accept(task.payload());
+ taskCompleter.add(task.taskId());
+ } catch (Exception e) {
+ log.debug("Task execution failed", e);
+ } finally {
+ headRoom.incrementAndGet();
+ resumeWork();
+ }
+ }));
+ }
+ }
+}