Support for a distributed queue primitive.

Change-Id: I13abb93ec1703105ff0137e137738483a5b6a143
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index a222627..c7c98c5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.store.consistent.impl;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -46,6 +47,7 @@
 import static org.onlab.util.Tools.groupedThreads;
 
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
 import org.onosproject.store.cluster.impl.NodeInfo;
@@ -55,6 +57,7 @@
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.DistributedQueueBuilder;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapInfo;
@@ -98,16 +101,21 @@
     private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
     private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
 
+    protected static final MessageSubject QUEUE_UPDATED_TOPIC = new MessageSubject("distributed-queue-updated");
+
     private ClusterCoordinator coordinator;
     protected PartitionedDatabase partitionedDatabase;
     protected Database inMemoryDatabase;
+    protected NodeId localNodeId;
 
     private TransactionManager transactionManager;
     private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
 
     private ExecutorService eventDispatcher;
+    private ExecutorService queuePollExecutor;
 
-    private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
+    private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap();
+    private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -121,6 +129,7 @@
 
     @Activate
     public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
         // load database configuration
         File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
         log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
@@ -201,6 +210,19 @@
 
         eventDispatcher = Executors.newSingleThreadExecutor(
                 groupedThreads("onos/store/manager", "map-event-dispatcher"));
+
+        queuePollExecutor = Executors.newFixedThreadPool(4,
+                groupedThreads("onos/store/manager", "queue-poll-handler"));
+
+        clusterCommunicator.<String>addSubscriber(QUEUE_UPDATED_TOPIC,
+                data -> new String(data, Charsets.UTF_8),
+                name -> {
+                    DefaultDistributedQueue q = queues.get(name);
+                    if (q != null) {
+                        q.tryPoll();
+                    }
+                },
+                queuePollExecutor);
         log.info("Started");
     }
 
@@ -226,8 +248,10 @@
                     log.info("Successfully closed databases.");
                 }
             });
-        maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
+        clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
+        maps.values().forEach(this::unregisterMap);
         eventDispatcher.shutdown();
+        queuePollExecutor.shutdown();
         log.info("Stopped");
     }
 
@@ -318,6 +342,12 @@
         return new DefaultDistributedSetBuilder<>(this);
     }
 
+
+    @Override
+    public <E> DistributedQueueBuilder<E> queueBuilder() {
+        return new DefaultDistributedQueueBuilder<>(this);
+    }
+
     @Override
     public AtomicCounterBuilder atomicCounterBuilder() {
         return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase);
@@ -386,8 +416,8 @@
     }
 
     protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
-        // TODO: Support different local instances of the same map.
-        if (!maps.add(map)) {
+        // TODO: Support multiple local instances of the same map.
+        if (maps.putIfAbsent(map.name(), map) != null) {
             throw new IllegalStateException("Map by name " + map.name() + " already exists");
         }
 
@@ -397,6 +427,19 @@
                 eventDispatcher);
     }
 
+    protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) {
+        if (maps.remove(map.name()) != null) {
+            clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
+        }
+    }
+
+    protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
+        // TODO: Support multiple local instances of the same queue.
+        if (queues.putIfAbsent(queue.name(), queue) != null) {
+            throw new IllegalStateException("Queue by name " + queue.name() + " already exists");
+        }
+    }
+
     protected static MessageSubject mapUpdatesSubject(String mapName) {
         return new MessageSubject(mapName + "-map-updates");
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 815b142..518b526 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -21,6 +21,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
@@ -249,6 +250,36 @@
     CompletableFuture<Long> counterGet(String counterName);
 
     /**
+     * Returns the size of queue.
+     * @param queueName queue name
+     * @return queue size
+     */
+    CompletableFuture<Long> queueSize(String queueName);
+
+    /**
+     * Inserts an entry into the queue.
+     * @param queueName queue name
+     * @param entry queue entry
+     * @return set of nodes to notify about the queue update
+     */
+    CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry);
+
+    /**
+     * Removes an entry from the queue if the queue is non-empty.
+     * @param queueName queue name
+     * @param nodeId If the queue is empty the identifier of node to notify when an entry becomes available
+     * @return entry. Can be null if queue is empty
+     */
+    CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId);
+
+    /**
+     * Returns but does not remove an entry from the queue.
+     * @param queueName queue name
+     * @return entry. Can be null if queue is empty
+     */
+    CompletableFuture<byte[]> queuePeek(String queueName);
+
+    /**
      * Prepare and commit the specified transaction.
      *
      * @param transaction transaction to commit (after preparation)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index 9b6a322..db28826 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -21,6 +21,7 @@
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.service.DatabaseUpdate;
@@ -78,6 +79,7 @@
             .register(Result.Status.class)
             .register(DefaultTransaction.class)
             .register(Transaction.State.class)
+            .register(NodeId.class)
             .build();
 
     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 73eacdd..0af7f42 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -21,6 +21,7 @@
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
@@ -113,6 +114,18 @@
   Long counterGetAndAdd(String counterName, long delta);
 
   @Query
+  Long queueSize(String queueName);
+
+  @Query
+  byte[] queuePeek(String queueName);
+
+  @Command
+  byte[] queuePop(String queueName, NodeId requestor);
+
+  @Command
+  Set<NodeId> queuePush(String queueName, byte[] entry);
+
+  @Query
   Long counterGet(String counterName);
 
   @Command
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 52a1b20..5d0ca83 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
@@ -187,6 +188,26 @@
     }
 
     @Override
+    public CompletableFuture<Long> queueSize(String queueName) {
+        return checkOpen(() -> proxy.queueSize(queueName));
+    }
+
+    @Override
+    public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+        return checkOpen(() -> proxy.queuePush(queueName, entry));
+    }
+
+    @Override
+    public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
+        return checkOpen(() -> proxy.queuePop(queueName, nodeId));
+    }
+
+    @Override
+    public CompletableFuture<byte[]> queuePeek(String queueName) {
+        return checkOpen(() -> proxy.queuePeek(queueName));
+    }
+
+    @Override
     public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
         return checkOpen(() -> proxy.prepareAndCommit(transaction));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 7edeb44..f1bba25 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -19,13 +19,16 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
@@ -46,6 +49,8 @@
     private Long nextVersion;
     private Map<String, AtomicLong> counters;
     private Map<String, Map<String, Versioned<byte[]>>> tables;
+    private Map<String, Queue<byte[]>> queues;
+    private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
 
     /**
      * This locks map has a structure similar to the "tables" map above and
@@ -77,6 +82,16 @@
             locks = Maps.newConcurrentMap();
             context.put("locks", locks);
         }
+        queues = context.get("queues");
+        if (queues == null) {
+            queues = Maps.newConcurrentMap();
+            context.put("queues", queues);
+        }
+        queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
+        if (queueUpdateNotificationTargets == null) {
+            queueUpdateNotificationTargets = Maps.newConcurrentMap();
+            context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
+        }
         nextVersion = context.get("nextVersion");
         if (nextVersion == null) {
             nextVersion = new Long(0);
@@ -288,6 +303,36 @@
     }
 
     @Override
+    public Long queueSize(String queueName) {
+        return Long.valueOf(getQueue(queueName).size());
+    }
+
+    @Override
+    public byte[] queuePeek(String queueName) {
+        Queue<byte[]> queue = getQueue(queueName);
+        return queue.peek();
+    }
+
+    @Override
+    public byte[] queuePop(String queueName, NodeId requestor) {
+        Queue<byte[]> queue = getQueue(queueName);
+        if (queue.size() == 0 && requestor != null) {
+            getQueueUpdateNotificationTargets(queueName).add(requestor);
+            return null;
+        } else {
+            return queue.remove();
+        }
+    }
+
+    @Override
+    public Set<NodeId> queuePush(String queueName, byte[] entry) {
+        getQueue(queueName).add(entry);
+        Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
+        getQueueUpdateNotificationTargets(queueName).clear();
+        return notifyList;
+    }
+
+    @Override
     public boolean prepareAndCommit(Transaction transaction) {
         if (prepare(transaction)) {
             return commit(transaction);
@@ -335,6 +380,14 @@
         return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
     }
 
+    private Queue<byte[]> getQueue(String queueName) {
+        return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
+    }
+
+    private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
+        return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
+    }
+
     private boolean isUpdatePossible(DatabaseUpdate update) {
         Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
         switch (update.type()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
new file mode 100644
index 0000000..0bcbdc4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.Serializer;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ * DistributedQueue implementation that provides FIFO ordering semantics.
+ *
+ * @param <E> queue entry type
+ */
+public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
+
+    private final String name;
+    private final Database database;
+    private final Serializer serializer;
+    private final NodeId localNodeId;
+    private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
+    private final Consumer<Set<NodeId>> notifyConsumers;
+
+    private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
+
+    public DefaultDistributedQueue(String name,
+            Database database,
+            Serializer serializer,
+            NodeId localNodeId,
+            Consumer<Set<NodeId>> notifyConsumers) {
+        this.name = checkNotNull(name, "queue name cannot be null");
+        this.database = checkNotNull(database, "database cannot be null");
+        this.serializer = checkNotNull(serializer, "serializer cannot be null");
+        this.localNodeId = localNodeId;
+        this.notifyConsumers = notifyConsumers;
+    }
+
+    @Override
+    public long size() {
+        return Futures.getUnchecked(database.queueSize(name));
+    }
+
+    @Override
+    public void push(E entry) {
+        checkNotNull(entry, ERROR_NULL_ENTRY);
+        Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
+                                     .thenAccept(notifyConsumers)
+                                     .thenApply(v -> null));
+    }
+
+    @Override
+    public CompletableFuture<E> pop() {
+        return database.queuePop(name, localNodeId)
+                       .thenCompose(v -> {
+                           if (v != null) {
+                               return CompletableFuture.completedFuture(serializer.decode(v));
+                           } else {
+                               CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
+                               pendingFutures.add(newPendingFuture);
+                               return newPendingFuture;
+                           }
+                       });
+    }
+
+    @Override
+    public E peek() {
+        return Futures.getUnchecked(database.queuePeek(name)
+                                            .thenApply(v -> v != null ? serializer.decode(v) : null));
+    }
+
+    public String name() {
+        return name;
+    }
+
+    protected void tryPoll() {
+        Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
+        for (CompletableFuture<E> future : pendingFutures) {
+            E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
+                                                   .thenApply(v -> v != null ? serializer.decode(v) : null));
+            if (entry != null) {
+                future.complete(entry);
+                completedFutures.add(future);
+            } else {
+                break;
+            }
+        }
+        pendingFutures.removeAll(completedFutures);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
new file mode 100644
index 0000000..6095fab
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Set;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.DistributedQueueBuilder;
+import org.onosproject.store.service.Serializer;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Default implementation of a {@code DistributedQueueBuilder}.
+ *
+ * @param <E> queue entry type
+ */
+public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
+
+    private Serializer serializer;
+    private String name;
+    private boolean persistenceEnabled = true;
+    private final DatabaseManager databaseManager;
+
+    public DefaultDistributedQueueBuilder(
+            DatabaseManager databaseManager) {
+        this.databaseManager = databaseManager;
+    }
+
+    @Override
+    public DistributedQueueBuilder<E> withName(String name) {
+        checkArgument(name != null && !name.isEmpty());
+        this.name = name;
+        return this;
+    }
+
+    @Override
+    public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
+        checkArgument(serializer != null);
+        this.serializer = serializer;
+        return this;
+    }
+
+    @Override
+    public DistributedQueueBuilder<E> withPersistenceDisabled() {
+        persistenceEnabled = false;
+        return this;
+    }
+
+    private boolean validInputs() {
+        return name != null && serializer != null;
+    }
+
+    @Override
+    public DistributedQueue<E> build() {
+        checkState(validInputs());
+        Consumer<Set<NodeId>> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name,
+                        DatabaseManager.QUEUE_UPDATED_TOPIC,
+                        s -> s.getBytes(Charsets.UTF_8),
+                        nodes);
+        DefaultDistributedQueue<E> queue = new DefaultDistributedQueue<>(
+                name,
+                persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
+                serializer,
+                databaseManager.localNodeId,
+                notifyOthers);
+        databaseManager.registerQueue(queue);
+        return queue;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 7b279dc..083ca86 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
@@ -276,6 +277,31 @@
         return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
     }
 
+
+    @Override
+    public CompletableFuture<Long> queueSize(String queueName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(queueName, queueName).queueSize(queueName);
+    }
+
+    @Override
+    public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
+    }
+
+    @Override
+    public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
+    }
+
+    @Override
+    public CompletableFuture<byte[]> queuePeek(String queueName) {
+        checkState(isOpen.get(), DB_NOT_OPEN);
+        return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
+    }
+
     @Override
     public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
         Map<Database, Transaction> subTransactions = createSubTransactions(transaction);