Distributed work queue primitive

Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index 2316dd1..2dfcd2c 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -23,6 +23,7 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -88,6 +89,15 @@
     AsyncLeaderElector newAsyncLeaderElector(String name);
 
     /**
+     * Creates a new {@code WorkQueue}.
+     *
+     * @param name work queue name
+     * @param serializer serializer
+     * @return work queue
+     */
+    <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
+
+    /**
      * Returns the names of all created {@code AsyncConsistentMap} instances.
      * @return set of {@code AsyncConsistentMap} names
      */
@@ -98,4 +108,10 @@
      * @return set of {@code AsyncAtomicCounter} names
      */
     Set<String> getAsyncAtomicCounterNames();
+
+    /**
+     * Returns the names of all created {@code WorkQueue} instances.
+     * @return set of {@code WorkQueue} names
+     */
+    Set<String> getWorkQueueNames();
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index 33a11c9..542149c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -52,6 +52,13 @@
     Map<String, Long> getCounters();
 
     /**
+     * Returns statistics for all the work queues in the system.
+     *
+     * @return mapping from queue name to that queue's stats
+     */
+    Map<String, WorkQueueStats> getQueueStats();
+
+    /**
      * Returns all pending transactions.
      *
      * @return collection of pending transaction identifiers.
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index e10c3d0..f342bdd 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -107,4 +107,13 @@
     default AtomicCounter getAtomicCounter(String name) {
         return getAsyncAtomicCounter(name).asAtomicCounter();
     }
+
+    /**
+     * Returns an instance of {@code WorkQueue} with specified name.
+     * @param name work queue name
+     * @param serializer serializer
+     *
+     * @return WorkQueue instance
+     */
+    <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/Task.java b/core/api/src/main/java/org/onosproject/store/service/Task.java
new file mode 100644
index 0000000..a8ff07b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Task.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.function.Function;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * {@link WorkQueue} task.
+ *
+ * @param <E> task payload type.
+ */
+public class Task<E> {
+    private final E payload;
+    private final String taskId;
+
+    private Task() {
+        payload = null;
+        taskId = null;
+    }
+
+    /**
+     * Constructs a new task instance.
+     * @param taskId task identifier
+     * @param payload task payload
+     */
+    public Task(String taskId, E payload) {
+        this.taskId = taskId;
+        this.payload = payload;
+    }
+
+    /**
+     * Returns the task identifier.
+     * @return task id
+     */
+    public String taskId() {
+        return taskId;
+    }
+
+    /**
+     * Returns the task payload.
+     * @return task payload
+     */
+    public E payload() {
+        return payload;
+    }
+
+    /**
+     * Maps task from one payload type to another.
+     * @param mapper type mapper.
+     * @return mapped task.
+     */
+    public <F> Task<F> map(Function<E, F> mapper) {
+        return new Task<>(taskId, mapper.apply(payload));
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("taskId", taskId)
+                .add("payload", payload)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
new file mode 100644
index 0000000..220a646
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Distributed Work Queue primitive.
+ * <p>
+ * Work queue serves as a buffer allowing producers to {@link #add(Collection) add} tasks and consumers
+ * to {@link #take() take} tasks to process.
+ * <p>
+ * In the system each task is tracked via its unique task identifier which is returned when a task is taken.
+ * Work queue guarantees that a task can be taken by only one consumer at a time. Once it finishes processing a
+ * consumer must invoke the {@link #complete(Collection) complete} method to mark the task(s) as completed.
+ * Tasks thus completed are removed from the queue. If a consumer unexpectedly terminates before it can complete
+ * all its tasks are returned back to the queue so that other consumers can pick them up. Since there is a distinct
+ * possibility that tasks could be processed more than once (under failure conditions), care should be taken to ensure
+ * task processing logic is idempotent.
+ *
+ * @param <E> task payload type.
+ */
+public interface WorkQueue<E> {
+
+    /**
+     * Adds a collection of tasks to the work queue.
+     * @param items collection of task items
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> addMultiple(Collection<E> items);
+
+    /**
+     * Picks up multiple tasks from the work queue to work on.
+     * <p>
+     * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
+     * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
+     * the task becomes visible again to other consumers to process.
+     * @param maxItems maximum number of items to take from the queue. The actual number of tasks returned
+     * can be at the max this number
+     * @return future for the tasks. The future can be completed with an empty collection if there are no
+     * unassigned tasks in the work queue
+     */
+    CompletableFuture<Collection<Task<E>>> take(int maxItems);
+
+    /**
+     * Completes a collection of tasks.
+     * @param taskIds ids of tasks to complete
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> complete(Collection<String> taskIds);
+
+    /**
+     * Registers a task processing callback to be automatically invoked when new tasks are
+     * added to the work queue.
+     * @param taskProcessor task processing callback
+     * @param parallelism max tasks that can be processed in parallel
+     * @param executor executor to use for processing the tasks
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> registerTaskProcessor(Consumer<E> taskProcessor,
+                                                  int parallelism,
+                                                  Executor executor);
+
+    /**
+     * Stops automatically processing tasks from work queue. This call nullifies the effect of a
+     * previous {@link #registerTaskProcessor registerTaskProcessor} call.
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> stopProcessing();
+
+    /**
+     * Returns work queue statistics.
+     * @return future that is completed with work queue stats when the operation completes
+     */
+    CompletableFuture<WorkQueueStats> stats();
+
+    /**
+     * Completes a collection of tasks.
+     * @param taskIds var arg list of task ids
+     * @return future that is completed when the operation completes
+     */
+    default CompletableFuture<Void> complete(String... taskIds) {
+        return complete(Arrays.asList(taskIds));
+    }
+
+    /**
+     * Adds a single task to the work queue.
+     * @param item task item
+     * @return future that is completed when the operation completes
+     */
+    default CompletableFuture<Void> addOne(E item) {
+        return addMultiple(ImmutableList.of(item));
+    }
+
+    /**
+     * Picks up a single task from the work queue to work on.
+     * <p>
+     * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
+     * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
+     * the task becomes visible again to other consumers to process.
+     * @return future for the task. The future can be completed with null, if there are no
+     * unassigned tasks in the work queue
+     */
+    default CompletableFuture<Task<E>> take() {
+        return this.take(1).thenApply(tasks -> tasks.isEmpty() ? null : tasks.iterator().next());
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/WorkQueueStats.java b/core/api/src/main/java/org/onosproject/store/service/WorkQueueStats.java
new file mode 100644
index 0000000..d2489ad
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/WorkQueueStats.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Statistics for a {@link WorkQueue}.
+ */
+public final class WorkQueueStats {
+
+    private long totalPending;
+    private long totalInProgress;
+    private long totalCompleted;
+
+    /**
+     * Returns a {@code WorkQueueStats} builder.
+     * @return builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    private WorkQueueStats() {
+    }
+
+    public static class Builder {
+
+        WorkQueueStats workQueueStats = new WorkQueueStats();
+
+        public Builder withTotalPending(long value) {
+            workQueueStats.totalPending = value;
+            return this;
+        }
+
+        public Builder withTotalInProgress(long value) {
+            workQueueStats.totalInProgress = value;
+            return this;
+        }
+
+        public Builder withTotalCompleted(long value) {
+            workQueueStats.totalCompleted = value;
+            return this;
+        }
+
+        public WorkQueueStats build() {
+            return workQueueStats;
+        }
+    }
+
+    /**
+     * Returns the total pending tasks. These are the tasks that are added but not yet picked up.
+     * @return total pending tasks.
+     */
+    public long totalPending() {
+        return this.totalPending;
+    }
+
+    /**
+     * Returns the total in progress tasks. These are the tasks that are currently being worked on.
+     * @return total in progress tasks.
+     */
+    public long totalInProgress() {
+        return this.totalInProgress;
+    }
+
+    /**
+     * Returns the total completed tasks.
+     * @return total completed tasks.
+     */
+    public long totalCompleted() {
+        return this.totalCompleted;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("totalPending", totalPending)
+                .add("totalInProgress", totalInProgress)
+                .add("totalCompleted", totalCompleted)
+                .toString();
+    }
+}