Support for a distributed queue primitive.
Change-Id: I13abb93ec1703105ff0137e137738483a5b6a143
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 7edeb44..f1bba25 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -19,13 +19,16 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -46,6 +49,8 @@
private Long nextVersion;
private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> tables;
+ private Map<String, Queue<byte[]>> queues;
+ private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
/**
* This locks map has a structure similar to the "tables" map above and
@@ -77,6 +82,16 @@
locks = Maps.newConcurrentMap();
context.put("locks", locks);
}
+ queues = context.get("queues");
+ if (queues == null) {
+ queues = Maps.newConcurrentMap();
+ context.put("queues", queues);
+ }
+ queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
+ if (queueUpdateNotificationTargets == null) {
+ queueUpdateNotificationTargets = Maps.newConcurrentMap();
+ context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
+ }
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
@@ -288,6 +303,36 @@
}
@Override
+ public Long queueSize(String queueName) {
+ return Long.valueOf(getQueue(queueName).size());
+ }
+
+ @Override
+ public byte[] queuePeek(String queueName) {
+ Queue<byte[]> queue = getQueue(queueName);
+ return queue.peek();
+ }
+
+ @Override
+ public byte[] queuePop(String queueName, NodeId requestor) {
+ Queue<byte[]> queue = getQueue(queueName);
+ if (queue.size() == 0 && requestor != null) {
+ getQueueUpdateNotificationTargets(queueName).add(requestor);
+ return null;
+ } else {
+ return queue.remove();
+ }
+ }
+
+ @Override
+ public Set<NodeId> queuePush(String queueName, byte[] entry) {
+ getQueue(queueName).add(entry);
+ Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
+ getQueueUpdateNotificationTargets(queueName).clear();
+ return notifyList;
+ }
+
+ @Override
public boolean prepareAndCommit(Transaction transaction) {
if (prepare(transaction)) {
return commit(transaction);
@@ -335,6 +380,14 @@
return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
}
+ private Queue<byte[]> getQueue(String queueName) {
+ return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
+ }
+
+ private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
+ return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
+ }
+
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {