Distributed work queue primitive

Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
index 6189ec8..b8ad5d0 100644
--- a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
+++ b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.vtnrsc.util;
 
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedSetBuilder;
@@ -22,6 +23,7 @@
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContextBuilder;
 import org.onosproject.store.service.StorageService;
 
@@ -68,4 +70,9 @@
     public LeaderElectorBuilder leaderElectorBuilder() {
         return null;
     }
+
+    @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        return null;
+    }
 }
diff --git a/cli/src/main/java/org/onosproject/cli/net/QueuesListCommand.java b/cli/src/main/java/org/onosproject/cli/net/QueuesListCommand.java
new file mode 100644
index 0000000..a60cc6c
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/QueuesListCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli.net;
+
+import java.util.Map;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.store.service.StorageAdminService;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Command to list stats for all work queues in the system.
+ */
+@Command(scope = "onos", name = "queues",
+        description = "Lists information about work queues in the system")
+public class QueuesListCommand extends AbstractShellCommand {
+
+    private static final String FMT = "name=%s pending=%d inProgress=%d, completed=%d";
+
+    @Override
+    protected void execute() {
+        StorageAdminService storageAdminService = get(StorageAdminService.class);
+        Map<String, WorkQueueStats> queueStats = storageAdminService.getQueueStats();
+        if (outputJson()) {
+            ObjectMapper mapper = new ObjectMapper();
+            ObjectNode jsonQueues = mapper.createObjectNode();
+            queueStats.forEach((k, v) -> {
+                ObjectNode jsonStats = jsonQueues.putObject(k);
+                jsonStats.put("pending", v.totalPending());
+                jsonStats.put("inProgress", v.totalInProgress());
+                jsonStats.put("completed", v.totalCompleted());
+            });
+            print("%s", jsonQueues);
+        } else {
+            queueStats.forEach((name, stats) ->
+            print(FMT, name, stats.totalPending(), stats.totalInProgress(), stats.totalCompleted()));
+        }
+    }
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 6ea9077..c767289 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -400,6 +400,9 @@
             <action class="org.onosproject.cli.net.CountersListCommand"/>
         </command>
         <command>
+            <action class="org.onosproject.cli.net.QueuesListCommand"/>
+        </command>
+        <command>
             <action class="org.onosproject.cli.net.TransactionsCommand"/>
         </command>
         <command>
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index 2316dd1..2dfcd2c 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -23,6 +23,7 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -88,6 +89,15 @@
     AsyncLeaderElector newAsyncLeaderElector(String name);
 
     /**
+     * Creates a new {@code WorkQueue}.
+     *
+     * @param name work queue name
+     * @param serializer serializer
+     * @return work queue
+     */
+    <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
+
+    /**
      * Returns the names of all created {@code AsyncConsistentMap} instances.
      * @return set of {@code AsyncConsistentMap} names
      */
@@ -98,4 +108,10 @@
      * @return set of {@code AsyncAtomicCounter} names
      */
     Set<String> getAsyncAtomicCounterNames();
+
+    /**
+     * Returns the names of all created {@code WorkQueue} instances.
+     * @return set of {@code WorkQueue} names
+     */
+    Set<String> getWorkQueueNames();
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
index 33a11c9..542149c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java
@@ -52,6 +52,13 @@
     Map<String, Long> getCounters();
 
     /**
+     * Returns statistics for all the work queues in the system.
+     *
+     * @return mapping from queue name to that queue's stats
+     */
+    Map<String, WorkQueueStats> getQueueStats();
+
+    /**
      * Returns all pending transactions.
      *
      * @return collection of pending transaction identifiers.
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index e10c3d0..f342bdd 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -107,4 +107,13 @@
     default AtomicCounter getAtomicCounter(String name) {
         return getAsyncAtomicCounter(name).asAtomicCounter();
     }
+
+    /**
+     * Returns an instance of {@code WorkQueue} with specified name.
+     * @param name work queue name
+     * @param serializer serializer
+     *
+     * @return WorkQueue instance
+     */
+    <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/Task.java b/core/api/src/main/java/org/onosproject/store/service/Task.java
new file mode 100644
index 0000000..a8ff07b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Task.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.function.Function;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * {@link WorkQueue} task.
+ *
+ * @param <E> task payload type.
+ */
+public class Task<E> {
+    private final E payload;
+    private final String taskId;
+
+    private Task() {
+        payload = null;
+        taskId = null;
+    }
+
+    /**
+     * Constructs a new task instance.
+     * @param taskId task identifier
+     * @param payload task payload
+     */
+    public Task(String taskId, E payload) {
+        this.taskId = taskId;
+        this.payload = payload;
+    }
+
+    /**
+     * Returns the task identifier.
+     * @return task id
+     */
+    public String taskId() {
+        return taskId;
+    }
+
+    /**
+     * Returns the task payload.
+     * @return task payload
+     */
+    public E payload() {
+        return payload;
+    }
+
+    /**
+     * Maps task from one payload type to another.
+     * @param mapper type mapper.
+     * @return mapped task.
+     */
+    public <F> Task<F> map(Function<E, F> mapper) {
+        return new Task<>(taskId, mapper.apply(payload));
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("taskId", taskId)
+                .add("payload", payload)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
new file mode 100644
index 0000000..220a646
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/WorkQueue.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Distributed Work Queue primitive.
+ * <p>
+ * Work queue serves as a buffer allowing producers to {@link #add(Collection) add} tasks and consumers
+ * to {@link #take() take} tasks to process.
+ * <p>
+ * In the system each task is tracked via its unique task identifier which is returned when a task is taken.
+ * Work queue guarantees that a task can be taken by only one consumer at a time. Once it finishes processing a
+ * consumer must invoke the {@link #complete(Collection) complete} method to mark the task(s) as completed.
+ * Tasks thus completed are removed from the queue. If a consumer unexpectedly terminates before it can complete
+ * all its tasks are returned back to the queue so that other consumers can pick them up. Since there is a distinct
+ * possibility that tasks could be processed more than once (under failure conditions), care should be taken to ensure
+ * task processing logic is idempotent.
+ *
+ * @param <E> task payload type.
+ */
+public interface WorkQueue<E> {
+
+    /**
+     * Adds a collection of tasks to the work queue.
+     * @param items collection of task items
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> addMultiple(Collection<E> items);
+
+    /**
+     * Picks up multiple tasks from the work queue to work on.
+     * <p>
+     * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
+     * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
+     * the task becomes visible again to other consumers to process.
+     * @param maxItems maximum number of items to take from the queue. The actual number of tasks returned
+     * can be at the max this number
+     * @return future for the tasks. The future can be completed with an empty collection if there are no
+     * unassigned tasks in the work queue
+     */
+    CompletableFuture<Collection<Task<E>>> take(int maxItems);
+
+    /**
+     * Completes a collection of tasks.
+     * @param taskIds ids of tasks to complete
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> complete(Collection<String> taskIds);
+
+    /**
+     * Registers a task processing callback to be automatically invoked when new tasks are
+     * added to the work queue.
+     * @param taskProcessor task processing callback
+     * @param parallelism max tasks that can be processed in parallel
+     * @param executor executor to use for processing the tasks
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> registerTaskProcessor(Consumer<E> taskProcessor,
+                                                  int parallelism,
+                                                  Executor executor);
+
+    /**
+     * Stops automatically processing tasks from work queue. This call nullifies the effect of a
+     * previous {@link #registerTaskProcessor registerTaskProcessor} call.
+     * @return future that is completed when the operation completes
+     */
+    CompletableFuture<Void> stopProcessing();
+
+    /**
+     * Returns work queue statistics.
+     * @return future that is completed with work queue stats when the operation completes
+     */
+    CompletableFuture<WorkQueueStats> stats();
+
+    /**
+     * Completes a collection of tasks.
+     * @param taskIds var arg list of task ids
+     * @return future that is completed when the operation completes
+     */
+    default CompletableFuture<Void> complete(String... taskIds) {
+        return complete(Arrays.asList(taskIds));
+    }
+
+    /**
+     * Adds a single task to the work queue.
+     * @param item task item
+     * @return future that is completed when the operation completes
+     */
+    default CompletableFuture<Void> addOne(E item) {
+        return addMultiple(ImmutableList.of(item));
+    }
+
+    /**
+     * Picks up a single task from the work queue to work on.
+     * <p>
+     * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
+     * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
+     * the task becomes visible again to other consumers to process.
+     * @return future for the task. The future can be completed with null, if there are no
+     * unassigned tasks in the work queue
+     */
+    default CompletableFuture<Task<E>> take() {
+        return this.take(1).thenApply(tasks -> tasks.isEmpty() ? null : tasks.iterator().next());
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/WorkQueueStats.java b/core/api/src/main/java/org/onosproject/store/service/WorkQueueStats.java
new file mode 100644
index 0000000..d2489ad
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/WorkQueueStats.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Statistics for a {@link WorkQueue}.
+ */
+public final class WorkQueueStats {
+
+    private long totalPending;
+    private long totalInProgress;
+    private long totalCompleted;
+
+    /**
+     * Returns a {@code WorkQueueStats} builder.
+     * @return builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    private WorkQueueStats() {
+    }
+
+    public static class Builder {
+
+        WorkQueueStats workQueueStats = new WorkQueueStats();
+
+        public Builder withTotalPending(long value) {
+            workQueueStats.totalPending = value;
+            return this;
+        }
+
+        public Builder withTotalInProgress(long value) {
+            workQueueStats.totalInProgress = value;
+            return this;
+        }
+
+        public Builder withTotalCompleted(long value) {
+            workQueueStats.totalCompleted = value;
+            return this;
+        }
+
+        public WorkQueueStats build() {
+            return workQueueStats;
+        }
+    }
+
+    /**
+     * Returns the total pending tasks. These are the tasks that are added but not yet picked up.
+     * @return total pending tasks.
+     */
+    public long totalPending() {
+        return this.totalPending;
+    }
+
+    /**
+     * Returns the total in progress tasks. These are the tasks that are currently being worked on.
+     * @return total in progress tasks.
+     */
+    public long totalInProgress() {
+        return this.totalInProgress;
+    }
+
+    /**
+     * Returns the total completed tasks.
+     * @return total completed tasks.
+     */
+    public long totalCompleted() {
+        return this.totalCompleted;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("totalPending", totalPending)
+                .add("totalInProgress", totalInProgress)
+                .add("totalCompleted", totalCompleted)
+                .toString();
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index 8c12b05..18ea539 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -58,4 +58,9 @@
     public LeaderElectorBuilder leaderElectorBuilder() {
         return null;
     }
+
+    @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        return null;
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 175e253..f07f838 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -33,6 +33,8 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
 import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
 import org.onosproject.store.primitives.resources.impl.PrepareResult;
@@ -40,8 +42,11 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.WorkQueueStats;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 /**
@@ -81,15 +86,20 @@
         serializer.register(MapTransaction.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
+        serializer.register(Task.class, factory);
+        serializer.register(WorkQueueStats.class, factory);
         serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
+        serializer.register(ImmutableList.of().getClass(), factory);
 
         serializer.resolve(new LongCommands.TypeResolver());
         serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
         serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
+        serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
         serializer.resolve(new ResourceManagerTypeResolver());
 
         serializer.registerClassLoader(AtomixConsistentMapFactory.class)
-                  .registerClassLoader(AtomixLeaderElectorFactory.class);
+                  .registerClassLoader(AtomixLeaderElectorFactory.class)
+                  .registerClassLoader(AtomixWorkQueueFactory.class);
 
         return serializer;
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
index 88d65d5..b0d6841 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
@@ -54,20 +54,19 @@
         }
 
         @Override
-        public void write(T object, BufferOutput buffer,
-                io.atomix.catalyst.serializer.Serializer serializer) {
+        public void write(T object, BufferOutput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
             try {
                 byte[] payload = this.serializer.encode(object);
                 buffer.writeInt(payload.length);
                 buffer.write(payload);
             } catch (Exception e) {
                 log.warn("Failed to serialize {}", object, e);
+                throw Throwables.propagate(e);
             }
         }
 
         @Override
-        public T read(Class<T> type, BufferInput buffer,
-                io.atomix.catalyst.serializer.Serializer serializer) {
+        public T read(Class<T> type, BufferInput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
             int size = buffer.readInt();
             try {
                 byte[] payload = new byte[size];
@@ -75,8 +74,7 @@
                 return this.serializer.decode(payload);
             } catch (Exception e) {
                 log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
-                Throwables.propagate(e);
-                return null;
+                throw Throwables.propagate(e);
             }
         }
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
new file mode 100644
index 0000000..549cfc6
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -0,0 +1,67 @@
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.collect.Collections2;
+
+/**
+ * Default implementation of {@link WorkQueue}.
+ *
+ * @param <E> task payload type.
+ */
+public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
+
+    private final WorkQueue<byte[]> backingQueue;
+    private final Serializer serializer;
+
+    public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
+        this.backingQueue = backingQueue;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public CompletableFuture<Void> addMultiple(Collection<E> items) {
+        return backingQueue.addMultiple(items.stream()
+                                             .map(serializer::encode)
+                                             .collect(Collectors.toCollection(ArrayList::new)));
+    }
+
+    @Override
+    public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
+        return backingQueue.take(maxTasks)
+                           .thenApply(tasks -> Collections2.transform(tasks, task -> task.<E>map(serializer::decode)));
+    }
+
+    @Override
+    public CompletableFuture<Void> complete(Collection<String> ids) {
+        return backingQueue.complete(ids);
+    }
+
+    @Override
+    public CompletableFuture<WorkQueueStats> stats() {
+        return backingQueue.stats();
+    }
+
+    @Override
+    public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
+                                                         int parallelism,
+                                                         Executor executor) {
+        Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
+        return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> stopProcessing() {
+        return backingQueue.stopProcessing();
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index e4b6f9a..a095de1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -30,6 +30,7 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Serializer;
 
 import com.google.common.base.Charsets;
@@ -101,6 +102,11 @@
     }
 
     @Override
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+        return getCreator(name).newWorkQueue(name, serializer);
+    }
+
+    @Override
     public Set<String> getAsyncConsistentMapNames() {
         return members.values()
                       .stream()
@@ -118,6 +124,15 @@
                       .orElse(ImmutableSet.of());
     }
 
+    @Override
+    public Set<String> getWorkQueueNames() {
+        return members.values()
+                      .stream()
+                      .map(DistributedPrimitiveCreator::getWorkQueueNames)
+                      .reduce(Sets::union)
+                      .orElse(ImmutableSet.of());
+    }
+
     /**
      * Returns the {@code DistributedPrimitiveCreator} to use for hosting a primitive.
      * @param name primitive name
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 4def1e9..8eb138a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -46,6 +46,7 @@
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedQueueBuilder;
 import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.LeaderElectorBuilder;
 import org.onosproject.store.service.MapInfo;
@@ -54,6 +55,7 @@
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -171,6 +173,12 @@
     }
 
     @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+    }
+
+    @Override
     public List<MapInfo> getMapInfo() {
         return listMapInfo(federatedPrimitiveCreator);
     }
@@ -185,6 +193,18 @@
     }
 
     @Override
+    public Map<String, WorkQueueStats> getQueueStats() {
+        Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
+        federatedPrimitiveCreator.getWorkQueueNames()
+               .forEach(name -> workQueueStats.put(name,
+                       federatedPrimitiveCreator.newWorkQueue(name,
+                                                              Serializer.using(KryoNamespaces.BASIC))
+                                                .stats()
+                                                .join()));
+        return workQueueStats;
+    }
+
+    @Override
     public List<PartitionInfo> getPartitionInfo() {
         return partitionAdminService.partitionInfo();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 0c1d8a6..d635a79 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -41,6 +41,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
 import org.onosproject.store.primitives.resources.impl.AtomixCounter;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicCounter;
 import org.onosproject.store.service.AsyncAtomicValue;
@@ -49,6 +50,7 @@
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.PartitionClientInfo;
 import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
@@ -159,11 +161,16 @@
 
     @Override
     public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
-        // TODO: Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+        AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
+        return new DefaultDistributedWorkQueue<>(workQueue, serializer);
+    }
+
+    @Override
     public AsyncLeaderElector newAsyncLeaderElector(String name) {
         AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
                                                   .thenCompose(AtomixLeaderElector::setupCache)
@@ -187,6 +194,11 @@
     }
 
     @Override
+    public Set<String> getWorkQueueNames() {
+        return client.keys(AtomixWorkQueue.class).join();
+    }
+
+    @Override
     public boolean isOpen() {
         return resourceClient.client().state() != State.CLOSED;
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
new file mode 100644
index 0000000..7b4ad47
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableList;
+
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
+
+/**
+ * Distributed resource providing the {@link WorkQueue} primitive.
+ */
+@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
+public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
+    implements WorkQueue<byte[]> {
+
+    private final Logger log = getLogger(getClass());
+    public static final String TASK_AVAILABLE = "task-available";
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
+    private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
+    private final Timer timer = new Timer("atomix-work-queue-completer");
+    private final AtomicBoolean isRegistered = new AtomicBoolean(false);
+
+    protected AtomixWorkQueue(CopycatClient client, Properties options) {
+        super(client, options);
+    }
+
+    @Override
+    public CompletableFuture<AtomixWorkQueue> open() {
+        return super.open().thenApply(result -> {
+            client.onStateChange(state -> {
+                if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
+                    client.submit(new Register());
+                }
+            });
+            client.onEvent(TASK_AVAILABLE, this::resumeWork);
+            return result;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
+        if (items.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return client.submit(new Add(items));
+    }
+
+    @Override
+    public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
+        if (maxTasks <= 0) {
+            return CompletableFuture.completedFuture(ImmutableList.of());
+        }
+        return client.submit(new Take(maxTasks));
+    }
+
+    @Override
+    public CompletableFuture<Void> complete(Collection<String> taskIds) {
+        if (taskIds.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return client.submit(new Complete(taskIds));
+    }
+
+    @Override
+    public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
+                                          int parallelism,
+                                          Executor executor) {
+        Accumulator<String> completedTaskAccumulator =
+                new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
+        taskProcessor.set(new TaskProcessor(callback,
+                                            parallelism,
+                                            executor,
+                                            completedTaskAccumulator));
+        return register().thenCompose(v -> take(parallelism))
+                         .thenAccept(taskProcessor.get()::accept);
+    }
+
+    @Override
+    public CompletableFuture<Void> stopProcessing() {
+        return unregister();
+    }
+
+    @Override
+    public CompletableFuture<WorkQueueStats> stats() {
+        return client.submit(new Stats());
+    }
+
+    private void resumeWork() {
+        TaskProcessor activeProcessor = taskProcessor.get();
+        if (activeProcessor == null) {
+            return;
+        }
+        this.take(activeProcessor.headRoom())
+            .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
+    }
+
+    private CompletableFuture<Void> register() {
+        return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
+    }
+
+    private CompletableFuture<Void> unregister() {
+        return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
+    }
+
+    // TaskId accumulator for paced triggering of task completion calls.
+    private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
+        CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
+            super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
+        }
+
+        @Override
+        public void processItems(List<String> items) {
+            complete(items);
+        }
+    }
+
+    private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
+
+        private final AtomicInteger headRoom;
+        private final Consumer<byte[]> backingConsumer;
+        private final Executor executor;
+        private final Accumulator<String> taskCompleter;
+
+        public TaskProcessor(Consumer<byte[]> backingConsumer,
+                             int parallelism,
+                             Executor executor,
+                             Accumulator<String> taskCompleter) {
+            this.backingConsumer = backingConsumer;
+            this.headRoom = new AtomicInteger(parallelism);
+            this.executor = executor;
+            this.taskCompleter = taskCompleter;
+        }
+
+        public int headRoom() {
+            return headRoom.get();
+        }
+
+        @Override
+        public void accept(Collection<Task<byte[]>> tasks) {
+            if (tasks == null) {
+                return;
+            }
+            headRoom.addAndGet(-1 * tasks.size());
+            tasks.forEach(task ->
+                executor.execute(() -> {
+                    try {
+                        backingConsumer.accept(task.payload());
+                        taskCompleter.add(task.taskId());
+                    } catch (Exception e) {
+                        log.debug("Task execution failed", e);
+                    } finally {
+                        headRoom.incrementAndGet();
+                        resumeWork();
+                    }
+                }));
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
new file mode 100644
index 0000000..3724529
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueCommands.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.base.MoreObjects;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+
+/**
+ * {@link AtomixWorkQueue} resource state machine operations.
+ */
+public final class AtomixWorkQueueCommands {
+
+    private AtomixWorkQueueCommands() {
+    }
+
+    /**
+     * Command to add a collection of tasks to the queue.
+     */
+    @SuppressWarnings("serial")
+    public static class Add implements Command<Void>, CatalystSerializable {
+
+        private Collection<byte[]> items;
+
+        private Add() {
+        }
+
+        public Add(Collection<byte[]> items) {
+            this.items = items;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            buffer.writeInt(items.size());
+            items.forEach(task -> serializer.writeObject(task, buffer));
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            items = IntStream.range(0, buffer.readInt())
+                             .mapToObj(i -> serializer.<byte[]>readObject(buffer))
+                             .collect(Collectors.toCollection(ArrayList::new));
+        }
+
+        public Collection<byte[]> items() {
+            return items;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("items", items)
+                    .toString();
+        }
+    }
+
+    /**
+     * Command to take a task from the queue.
+     */
+    @SuppressWarnings("serial")
+    public static class Take implements Command<Collection<Task<byte[]>>>, CatalystSerializable {
+
+        private int maxTasks;
+
+        private Take() {
+        }
+
+        public Take(int maxTasks) {
+            this.maxTasks = maxTasks;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            buffer.writeInt(maxTasks);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            maxTasks = buffer.readInt();
+        }
+
+        public int maxTasks() {
+            return maxTasks;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("maxTasks", maxTasks)
+                    .toString();
+        }
+    }
+
+    @SuppressWarnings("serial")
+    public static class Stats implements Command<WorkQueueStats>, CatalystSerializable {
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+    }
+
+
+
+    @SuppressWarnings("serial")
+    public static class Register implements Command<Void>, CatalystSerializable {
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+    }
+
+    @SuppressWarnings("serial")
+    public static class Unregister implements Command<Void>, CatalystSerializable {
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+    }
+
+    @SuppressWarnings("serial")
+    public static class Complete implements Command<Void>, CatalystSerializable {
+        private Collection<String> taskIds;
+
+        private Complete() {
+        }
+
+        public Complete(Collection<String> taskIds) {
+            this.taskIds = taskIds;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            serializer.writeObject(taskIds, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            taskIds = serializer.readObject(buffer);
+        }
+
+        public Collection<String> taskIds() {
+            return taskIds;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("taskIds", taskIds)
+                    .toString();
+        }
+    }
+
+    /**
+     * Work queue command type resolver.
+     */
+    public static class TypeResolver implements SerializableTypeResolver {
+        @Override
+        public void resolve(SerializerRegistry registry) {
+            registry.register(Register.class, -960);
+            registry.register(Unregister.class, -961);
+            registry.register(Take.class, -962);
+            registry.register(Add.class, -963);
+            registry.register(Complete.class, -964);
+            registry.register(Stats.class, -965);
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueFactory.java
new file mode 100644
index 0000000..0c61b2e
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.ResourceFactory;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Properties;
+
+/**
+ * {@link AtomixWorkQueue} resource factory.
+ */
+public class AtomixWorkQueueFactory implements ResourceFactory<AtomixWorkQueue> {
+
+    @Override
+    public SerializableTypeResolver createSerializableTypeResolver() {
+        return new AtomixWorkQueueCommands.TypeResolver();
+    }
+
+    @Override
+    public ResourceStateMachine createStateMachine(Properties config) {
+        return new AtomixWorkQueueState(config);
+    }
+
+    @Override
+    public AtomixWorkQueue createInstance(CopycatClient client, Properties properties) {
+        return new AtomixWorkQueue(client, properties);
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
new file mode 100644
index 0000000..d287e19
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueState.java
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.onlab.util.CountDownCompleter;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.AtomicLongMap;
+
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+/**
+ * State machine for {@link AtomixWorkQueue} resource.
+ */
+public class AtomixWorkQueueState  extends ResourceStateMachine implements SessionListener, Snapshottable {
+
+    private final Logger log = getLogger(getClass());
+
+    private final AtomicLong totalCompleted = new AtomicLong(0);
+
+    private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque();
+    private final Map<String, TaskAssignment> assignments = Maps.newHashMap();
+    private final Map<Long, Commit<? extends Register>> registeredWorkers = Maps.newHashMap();
+    private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create();
+
+    protected AtomixWorkQueueState(Properties config) {
+        super(config);
+    }
+
+    @Override
+    protected void configure(StateMachineExecutor executor) {
+        executor.register(Stats.class, this::stats);
+        executor.register(Register.class, (Consumer<Commit<Register>>) this::register);
+        executor.register(Unregister.class, (Consumer<Commit<Unregister>>) this::unregister);
+        executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
+        executor.register(Take.class, this::take);
+        executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
+    }
+
+    protected WorkQueueStats stats(Commit<? extends Stats> commit) {
+        try {
+            return WorkQueueStats.builder()
+                    .withTotalCompleted(totalCompleted.get())
+                    .withTotalPending(unassignedTasks.size())
+                    .withTotalInProgress(assignments.size())
+                    .build();
+        } finally {
+            commit.close();
+        }
+    }
+
+    protected void register(Commit<? extends Register> commit) {
+        long sessionId = commit.session().id();
+        if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
+            commit.close();
+        }
+    }
+
+    protected void unregister(Commit<? extends Unregister> commit) {
+        try {
+            Commit<? extends Register> registerCommit = registeredWorkers.remove(commit.session().id());
+            if (registerCommit != null) {
+                registerCommit.close();
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    protected void add(Commit<? extends Add> commit) {
+        Collection<byte[]> items = commit.operation().items();
+
+        // Create a CountDownCompleter that will close the commit when all tasks
+        // submitted as part of it are completed.
+        CountDownCompleter<Commit<? extends Add>> referenceTracker =
+                new CountDownCompleter<>(commit, items.size(), Commit::close);
+
+        AtomicInteger itemIndex = new AtomicInteger(0);
+        items.forEach(item -> {
+            String taskId = String.format("%d:%d:%d", commit.session().id(),
+                                                      commit.index(),
+                                                      itemIndex.getAndIncrement());
+            unassignedTasks.add(new TaskHolder(new Task<>(taskId, item), referenceTracker));
+        });
+
+        // Send an event to all sessions that have expressed interest in task processing
+        // and are not actively processing a task.
+        registeredWorkers.values()
+                         .stream()
+                         .map(Commit::session)
+                         .forEach(session -> session.publish(AtomixWorkQueue.TASK_AVAILABLE));
+        // FIXME: This generates a lot of event traffic.
+    }
+
+    protected Collection<Task<byte[]>> take(Commit<? extends Take> commit) {
+        try {
+            if (unassignedTasks.isEmpty()) {
+                return ImmutableList.of();
+            }
+            long sessionId = commit.session().id();
+            int maxTasks = commit.operation().maxTasks();
+            return IntStream.range(0, Math.min(maxTasks, unassignedTasks.size()))
+                            .mapToObj(i -> {
+                                TaskHolder holder = unassignedTasks.poll();
+                                String taskId = holder.task().taskId();
+                                TaskAssignment assignment = new TaskAssignment(sessionId, holder);
+
+                                // bookkeeping
+                                assignments.put(taskId, assignment);
+                                activeTasksPerSession.incrementAndGet(sessionId);
+
+                                return holder.task();
+                            })
+                            .collect(Collectors.toCollection(ArrayList::new));
+        } catch (Exception e) {
+            log.warn("State machine update failed", e);
+            throw Throwables.propagate(e);
+        } finally {
+            commit.close();
+        }
+    }
+
+    protected void complete(Commit<? extends Complete> commit) {
+        long sessionId = commit.session().id();
+        try {
+            commit.operation().taskIds().forEach(taskId -> {
+                TaskAssignment assignment = assignments.get(taskId);
+                if (assignment != null) {
+                    assignments.remove(taskId).markComplete();
+                    // bookkeeping
+                    totalCompleted.incrementAndGet();
+                    activeTasksPerSession.decrementAndGet(sessionId);
+                }
+            });
+        } catch (Exception e) {
+            log.warn("State machine update failed", e);
+            throw Throwables.propagate(e);
+        } finally {
+            commit.close();
+        }
+    }
+
+    @Override
+    public void register(ServerSession session) {
+    }
+
+    @Override
+    public void unregister(ServerSession session) {
+        evictWorker(session.id());
+    }
+
+    @Override
+    public void expire(ServerSession session) {
+        evictWorker(session.id());
+    }
+
+    @Override
+    public void close(ServerSession session) {
+        evictWorker(session.id());
+    }
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+        writer.writeLong(totalCompleted.get());
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+        totalCompleted.set(reader.readLong());
+    }
+
+    private void evictWorker(long sessionId) {
+        Commit<? extends Register> commit = registeredWorkers.remove(sessionId);
+        if (commit != null) {
+            commit.close();
+        }
+
+        // TODO: Maintain an index of tasks by session for efficient access.
+        Iterator<Map.Entry<String, TaskAssignment>> iter = assignments.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, TaskAssignment> entry = iter.next();
+            TaskAssignment assignment = entry.getValue();
+            if (assignment.sessionId() == sessionId) {
+                unassignedTasks.add(assignment.taskHolder());
+                iter.remove();
+            }
+        }
+
+        // Bookkeeping
+        activeTasksPerSession.remove(sessionId);
+        activeTasksPerSession.removeAllZeros();
+    }
+
+    private class TaskHolder {
+
+        private final Task<byte[]> task;
+        private final CountDownCompleter<Commit<? extends Add>> referenceTracker;
+
+        public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends Add>> referenceTracker) {
+            this.task = delegate;
+            this.referenceTracker = referenceTracker;
+        }
+
+        public Task<byte[]> task() {
+            return task;
+        }
+
+        public void complete() {
+            referenceTracker.countDown();
+        }
+    }
+
+    private class TaskAssignment {
+        private final long sessionId;
+        private final TaskHolder taskHolder;
+
+        public TaskAssignment(long sessionId, TaskHolder taskHolder) {
+            this.sessionId = sessionId;
+            this.taskHolder = taskHolder;
+        }
+
+        public long sessionId() {
+            return sessionId;
+        }
+
+        public TaskHolder taskHolder() {
+            return taskHolder;
+        }
+
+        public void markComplete() {
+            taskHolder.complete();
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                              .add("sessionId", sessionId)
+                              .add("taskHolder", taskHolder)
+                              .toString();
+        }
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index 7dfe82f..f7a5007 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+
 import io.atomix.AtomixClient;
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.transport.Address;
@@ -114,18 +115,18 @@
 
         CompletableFuture<Void> closeClients =
                 CompletableFuture.allOf(atomixClients.stream()
-                                         .map(AtomixClient::close)
-                                         .toArray(CompletableFuture[]::new));
+                                                     .map(AtomixClient::close)
+                                                     .toArray(CompletableFuture[]::new));
+        closeClients.join();
 
-        closeClients
-                .thenCompose(v -> CompletableFuture
-                        .allOf(copycatServers.stream()
-                .map(CopycatServer::shutdown)
-                .toArray(CompletableFuture[]::new))).join();
+        CompletableFuture<Void> closeServers =
+                CompletableFuture.allOf(copycatServers.stream()
+                                                      .map(CopycatServer::shutdown)
+                                                      .toArray(CompletableFuture[]::new));
+        closeServers.join();
 
-        atomixClients = new ArrayList<>();
-
-        copycatServers = new ArrayList<>();
+        atomixClients.clear();
+        copycatServers.clear();
     }
 
 
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
new file mode 100644
index 0000000..161424d
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueTest.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.resource.ResourceType;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link AtomixWorkQueue}.
+ */
+public class AtomixWorkQueueTest extends AtomixTestBase {
+
+    private static final Duration DEFAULT_PROCESSING_TIME = Duration.ofMillis(100);
+    private static final byte[] DEFAULT_PAYLOAD = "hello world".getBytes();
+
+    @BeforeClass
+    public static void preTestSetup() throws Throwable {
+        createCopycatServers(1);
+    }
+
+    @AfterClass
+    public static void postTestCleanup() throws Exception {
+        clearTests();
+    }
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(AtomixWorkQueue.class);
+    }
+
+    @Test
+    public void testAdd() throws Throwable {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item = DEFAULT_PAYLOAD;
+        queue1.addOne(item).join();
+
+        Atomix atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] task2 = DEFAULT_PAYLOAD;
+        queue2.addOne(task2).join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 2);
+        assertEquals(stats.totalInProgress(), 0);
+        assertEquals(stats.totalCompleted(), 0);
+    }
+
+    @Test
+    public void testAddMultiple() throws Throwable {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item1 = DEFAULT_PAYLOAD;
+        byte[] item2 = DEFAULT_PAYLOAD;
+        queue1.addMultiple(Arrays.asList(item1, item2)).join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 2);
+        assertEquals(stats.totalInProgress(), 0);
+        assertEquals(stats.totalCompleted(), 0);
+    }
+
+    @Test
+    public void testTakeAndComplete() throws Throwable {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item1 = DEFAULT_PAYLOAD;
+        queue1.addOne(item1).join();
+
+        Atomix atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+        Task<byte[]> removedTask = queue2.take().join();
+
+        WorkQueueStats stats = queue2.stats().join();
+        assertEquals(stats.totalPending(), 0);
+        assertEquals(stats.totalInProgress(), 1);
+        assertEquals(stats.totalCompleted(), 0);
+
+        assertTrue(Arrays.equals(removedTask.payload(), item1));
+        queue2.complete(Arrays.asList(removedTask.taskId())).join();
+
+        stats = queue1.stats().join();
+        assertEquals(stats.totalPending(), 0);
+        assertEquals(stats.totalInProgress(), 0);
+        assertEquals(stats.totalCompleted(), 1);
+
+        // Another take should return null
+        assertNull(queue2.take().join());
+    }
+
+    @Test
+    public void testUnexpectedClientClose() throws Throwable {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item1 = DEFAULT_PAYLOAD;
+        queue1.addOne(item1).join();
+
+        AtomixClient atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+        queue2.take().join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(0, stats.totalPending());
+        assertEquals(1, stats.totalInProgress());
+        assertEquals(0, stats.totalCompleted());
+
+        atomix2.close().join();
+
+        stats = queue1.stats().join();
+        assertEquals(1, stats.totalPending());
+        assertEquals(0, stats.totalInProgress());
+        assertEquals(0, stats.totalCompleted());
+    }
+
+    @Test
+    public void testAutomaticTaskProcessing() throws Throwable {
+        String queueName = UUID.randomUUID().toString();
+        Atomix atomix1 = createAtomixClient();
+        AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
+        Executor executor = Executors.newSingleThreadExecutor();
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        queue1.registerTaskProcessor(s -> latch1.countDown(), 2, executor);
+
+        AtomixClient atomix2 = createAtomixClient();
+        AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
+        byte[] item1 = DEFAULT_PAYLOAD;
+        queue2.addOne(item1).join();
+
+        Uninterruptibles.awaitUninterruptibly(latch1, 500, TimeUnit.MILLISECONDS);
+        queue1.stopProcessing();
+
+        byte[] item2 = DEFAULT_PAYLOAD;
+        byte[] item3 = DEFAULT_PAYLOAD;
+
+        Tools.delay((int) DEFAULT_PROCESSING_TIME.toMillis());
+
+        queue2.addMultiple(Arrays.asList(item2, item3)).join();
+
+        WorkQueueStats stats = queue1.stats().join();
+        assertEquals(2, stats.totalPending());
+        assertEquals(0, stats.totalInProgress());
+        assertEquals(1, stats.totalCompleted());
+
+        CountDownLatch latch2 = new CountDownLatch(2);
+        queue1.registerTaskProcessor(s -> latch2.countDown(), 2, executor);
+
+        Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
+    }
+}
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index e51ee78..10cd72f 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+
 import org.onlab.packet.ChassisId;
 import org.onlab.packet.EthType;
 import org.onlab.packet.Ip4Address;
@@ -208,7 +209,9 @@
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.WorkQueueStats;
 
 import java.net.URI;
 import java.time.Duration;
@@ -338,6 +341,8 @@
                     Leadership.class,
                     LeadershipEvent.class,
                     LeadershipEvent.Type.class,
+                    Task.class,
+                    WorkQueueStats.class,
                     HostId.class,
                     HostDescription.class,
                     DefaultHostDescription.class,