Distributed work queue primitive
Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
new file mode 100644
index 0000000..220a646
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Distributed Work Queue primitive.
+ * <p>
+ * Work queue serves as a buffer allowing producers to {@link #add(Collection) add} tasks and consumers
+ * to {@link #take() take} tasks to process.
+ * <p>
+ * In the system each task is tracked via its unique task identifier which is returned when a task is taken.
+ * Work queue guarantees that a task can be taken by only one consumer at a time. Once it finishes processing a
+ * consumer must invoke the {@link #complete(Collection) complete} method to mark the task(s) as completed.
+ * Tasks thus completed are removed from the queue. If a consumer unexpectedly terminates before it can complete
+ * all its tasks are returned back to the queue so that other consumers can pick them up. Since there is a distinct
+ * possibility that tasks could be processed more than once (under failure conditions), care should be taken to ensure
+ * task processing logic is idempotent.
+ *
+ * @param <E> task payload type.
+ */
+public interface WorkQueue<E> {
+
+ /**
+ * Adds a collection of tasks to the work queue.
+ * @param items collection of task items
+ * @return future that is completed when the operation completes
+ */
+ CompletableFuture<Void> addMultiple(Collection<E> items);
+
+ /**
+ * Picks up multiple tasks from the work queue to work on.
+ * <p>
+ * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
+ * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
+ * the task becomes visible again to other consumers to process.
+ * @param maxItems maximum number of items to take from the queue. The actual number of tasks returned
+ * can be at the max this number
+ * @return future for the tasks. The future can be completed with an empty collection if there are no
+ * unassigned tasks in the work queue
+ */
+ CompletableFuture<Collection<Task<E>>> take(int maxItems);
+
+ /**
+ * Completes a collection of tasks.
+ * @param taskIds ids of tasks to complete
+ * @return future that is completed when the operation completes
+ */
+ CompletableFuture<Void> complete(Collection<String> taskIds);
+
+ /**
+ * Registers a task processing callback to be automatically invoked when new tasks are
+ * added to the work queue.
+ * @param taskProcessor task processing callback
+ * @param parallelism max tasks that can be processed in parallel
+ * @param executor executor to use for processing the tasks
+ * @return future that is completed when the operation completes
+ */
+ CompletableFuture<Void> registerTaskProcessor(Consumer<E> taskProcessor,
+ int parallelism,
+ Executor executor);
+
+ /**
+ * Stops automatically processing tasks from work queue. This call nullifies the effect of a
+ * previous {@link #registerTaskProcessor registerTaskProcessor} call.
+ * @return future that is completed when the operation completes
+ */
+ CompletableFuture<Void> stopProcessing();
+
+ /**
+ * Returns work queue statistics.
+ * @return future that is completed with work queue stats when the operation completes
+ */
+ CompletableFuture<WorkQueueStats> stats();
+
+ /**
+ * Completes a collection of tasks.
+ * @param taskIds var arg list of task ids
+ * @return future that is completed when the operation completes
+ */
+ default CompletableFuture<Void> complete(String... taskIds) {
+ return complete(Arrays.asList(taskIds));
+ }
+
+ /**
+ * Adds a single task to the work queue.
+ * @param item task item
+ * @return future that is completed when the operation completes
+ */
+ default CompletableFuture<Void> addOne(E item) {
+ return addMultiple(ImmutableList.of(item));
+ }
+
+ /**
+ * Picks up a single task from the work queue to work on.
+ * <p>
+ * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
+ * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
+ * the task becomes visible again to other consumers to process.
+ * @return future for the task. The future can be completed with null, if there are no
+ * unassigned tasks in the work queue
+ */
+ default CompletableFuture<Task<E>> take() {
+ return this.take(1).thenApply(tasks -> tasks.isEmpty() ? null : tasks.iterator().next());
+ }
+}
\ No newline at end of file