Distributed work queue primitive

Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 175e253..f07f838 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -33,6 +33,8 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
 import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
 import org.onosproject.store.primitives.resources.impl.PrepareResult;
@@ -40,8 +42,11 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.WorkQueueStats;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 /**
@@ -81,15 +86,20 @@
         serializer.register(MapTransaction.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
+        serializer.register(Task.class, factory);
+        serializer.register(WorkQueueStats.class, factory);
         serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
+        serializer.register(ImmutableList.of().getClass(), factory);
 
         serializer.resolve(new LongCommands.TypeResolver());
         serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
         serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
+        serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
         serializer.resolve(new ResourceManagerTypeResolver());
 
         serializer.registerClassLoader(AtomixConsistentMapFactory.class)
-                  .registerClassLoader(AtomixLeaderElectorFactory.class);
+                  .registerClassLoader(AtomixLeaderElectorFactory.class)
+                  .registerClassLoader(AtomixWorkQueueFactory.class);
 
         return serializer;
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
index 88d65d5..b0d6841 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
@@ -54,20 +54,19 @@
         }
 
         @Override
-        public void write(T object, BufferOutput buffer,
-                io.atomix.catalyst.serializer.Serializer serializer) {
+        public void write(T object, BufferOutput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
             try {
                 byte[] payload = this.serializer.encode(object);
                 buffer.writeInt(payload.length);
                 buffer.write(payload);
             } catch (Exception e) {
                 log.warn("Failed to serialize {}", object, e);
+                throw Throwables.propagate(e);
             }
         }
 
         @Override
-        public T read(Class<T> type, BufferInput buffer,
-                io.atomix.catalyst.serializer.Serializer serializer) {
+        public T read(Class<T> type, BufferInput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
             int size = buffer.readInt();
             try {
                 byte[] payload = new byte[size];
@@ -75,8 +74,7 @@
                 return this.serializer.decode(payload);
             } catch (Exception e) {
                 log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
-                Throwables.propagate(e);
-                return null;
+                throw Throwables.propagate(e);
             }
         }
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
new file mode 100644
index 0000000..549cfc6
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -0,0 +1,67 @@
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.collect.Collections2;
+
+/**
+ * Default implementation of {@link WorkQueue}.
+ *
+ * @param <E> task payload type.
+ */
+public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
+
+    private final WorkQueue<byte[]> backingQueue;
+    private final Serializer serializer;
+
+    public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
+        this.backingQueue = backingQueue;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public CompletableFuture<Void> addMultiple(Collection<E> items) {
+        return backingQueue.addMultiple(items.stream()
+                                             .map(serializer::encode)
+                                             .collect(Collectors.toCollection(ArrayList::new)));
+    }
+
+    @Override
+    public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
+        return backingQueue.take(maxTasks)
+                           .thenApply(tasks -> Collections2.transform(tasks, task -> task.<E>map(serializer::decode)));
+    }
+
+    @Override
+    public CompletableFuture<Void> complete(Collection<String> ids) {
+        return backingQueue.complete(ids);
+    }
+
+    @Override
+    public CompletableFuture<WorkQueueStats> stats() {
+        return backingQueue.stats();
+    }
+
+    @Override
+    public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
+                                                         int parallelism,
+                                                         Executor executor) {
+        Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
+        return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> stopProcessing() {
+        return backingQueue.stopProcessing();
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index e4b6f9a..a095de1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -30,6 +30,7 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Serializer;
 
 import com.google.common.base.Charsets;
@@ -101,6 +102,11 @@
     }
 
     @Override
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+        return getCreator(name).newWorkQueue(name, serializer);
+    }
+
+    @Override
     public Set<String> getAsyncConsistentMapNames() {
         return members.values()
                       .stream()
@@ -118,6 +124,15 @@
                       .orElse(ImmutableSet.of());
     }
 
+    @Override
+    public Set<String> getWorkQueueNames() {
+        return members.values()
+                      .stream()
+                      .map(DistributedPrimitiveCreator::getWorkQueueNames)
+                      .reduce(Sets::union)
+                      .orElse(ImmutableSet.of());
+    }
+
     /**
      * Returns the {@code DistributedPrimitiveCreator} to use for hosting a primitive.
      * @param name primitive name
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 4def1e9..8eb138a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -46,6 +46,7 @@
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedQueueBuilder;
 import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.LeaderElectorBuilder;
 import org.onosproject.store.service.MapInfo;
@@ -54,6 +55,7 @@
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -171,6 +173,12 @@
     }
 
     @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+    }
+
+    @Override
     public List<MapInfo> getMapInfo() {
         return listMapInfo(federatedPrimitiveCreator);
     }
@@ -185,6 +193,18 @@
     }
 
     @Override
+    public Map<String, WorkQueueStats> getQueueStats() {
+        Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
+        federatedPrimitiveCreator.getWorkQueueNames()
+               .forEach(name -> workQueueStats.put(name,
+                       federatedPrimitiveCreator.newWorkQueue(name,
+                                                              Serializer.using(KryoNamespaces.BASIC))
+                                                .stats()
+                                                .join()));
+        return workQueueStats;
+    }
+
+    @Override
     public List<PartitionInfo> getPartitionInfo() {
         return partitionAdminService.partitionInfo();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 0c1d8a6..d635a79 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -41,6 +41,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
 import org.onosproject.store.primitives.resources.impl.AtomixCounter;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicCounter;
 import org.onosproject.store.service.AsyncAtomicValue;
@@ -49,6 +50,7 @@
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.PartitionClientInfo;
 import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
@@ -159,11 +161,16 @@
 
     @Override
     public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
-        // TODO: Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+        AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
+        return new DefaultDistributedWorkQueue<>(workQueue, serializer);
+    }
+
+    @Override
     public AsyncLeaderElector newAsyncLeaderElector(String name) {
         AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
                                                   .thenCompose(AtomixLeaderElector::setupCache)
@@ -187,6 +194,11 @@
     }
 
     @Override
+    public Set<String> getWorkQueueNames() {
+        return client.keys(AtomixWorkQueue.class).join();
+    }
+
+    @Override
     public boolean isOpen() {
         return resourceClient.client().state() != State.CLOSED;
     }