Distributed work queue primitive
Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 4def1e9..8eb138a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -46,6 +46,7 @@
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
@@ -54,6 +55,7 @@
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
@@ -171,6 +173,12 @@
}
@Override
+ public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+ }
+
+ @Override
public List<MapInfo> getMapInfo() {
return listMapInfo(federatedPrimitiveCreator);
}
@@ -185,6 +193,18 @@
}
@Override
+ public Map<String, WorkQueueStats> getQueueStats() {
+ Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
+ federatedPrimitiveCreator.getWorkQueueNames()
+ .forEach(name -> workQueueStats.put(name,
+ federatedPrimitiveCreator.newWorkQueue(name,
+ Serializer.using(KryoNamespaces.BASIC))
+ .stats()
+ .join()));
+ return workQueueStats;
+ }
+
+ @Override
public List<PartitionInfo> getPartitionInfo() {
return partitionAdminService.partitionInfo();
}