Distributed work queue primitive

Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 4def1e9..8eb138a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -46,6 +46,7 @@
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedQueueBuilder;
 import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.LeaderElectorBuilder;
 import org.onosproject.store.service.MapInfo;
@@ -54,6 +55,7 @@
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -171,6 +173,12 @@
     }
 
     @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+    }
+
+    @Override
     public List<MapInfo> getMapInfo() {
         return listMapInfo(federatedPrimitiveCreator);
     }
@@ -185,6 +193,18 @@
     }
 
     @Override
+    public Map<String, WorkQueueStats> getQueueStats() {
+        Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
+        federatedPrimitiveCreator.getWorkQueueNames()
+               .forEach(name -> workQueueStats.put(name,
+                       federatedPrimitiveCreator.newWorkQueue(name,
+                                                              Serializer.using(KryoNamespaces.BASIC))
+                                                .stats()
+                                                .join()));
+        return workQueueStats;
+    }
+
+    @Override
     public List<PartitionInfo> getPartitionInfo() {
         return partitionAdminService.partitionInfo();
     }