Support for a distributed queue primitive.

Change-Id: I13abb93ec1703105ff0137e137738483a5b6a143
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 7edeb44..f1bba25 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -19,13 +19,16 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
@@ -46,6 +49,8 @@
     private Long nextVersion;
     private Map<String, AtomicLong> counters;
     private Map<String, Map<String, Versioned<byte[]>>> tables;
+    private Map<String, Queue<byte[]>> queues;
+    private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
 
     /**
      * This locks map has a structure similar to the "tables" map above and
@@ -77,6 +82,16 @@
             locks = Maps.newConcurrentMap();
             context.put("locks", locks);
         }
+        queues = context.get("queues");
+        if (queues == null) {
+            queues = Maps.newConcurrentMap();
+            context.put("queues", queues);
+        }
+        queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
+        if (queueUpdateNotificationTargets == null) {
+            queueUpdateNotificationTargets = Maps.newConcurrentMap();
+            context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
+        }
         nextVersion = context.get("nextVersion");
         if (nextVersion == null) {
             nextVersion = new Long(0);
@@ -288,6 +303,36 @@
     }
 
     @Override
+    public Long queueSize(String queueName) {
+        return Long.valueOf(getQueue(queueName).size());
+    }
+
+    @Override
+    public byte[] queuePeek(String queueName) {
+        Queue<byte[]> queue = getQueue(queueName);
+        return queue.peek();
+    }
+
+    @Override
+    public byte[] queuePop(String queueName, NodeId requestor) {
+        Queue<byte[]> queue = getQueue(queueName);
+        if (queue.size() == 0 && requestor != null) {
+            getQueueUpdateNotificationTargets(queueName).add(requestor);
+            return null;
+        } else {
+            return queue.remove();
+        }
+    }
+
+    @Override
+    public Set<NodeId> queuePush(String queueName, byte[] entry) {
+        getQueue(queueName).add(entry);
+        Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
+        getQueueUpdateNotificationTargets(queueName).clear();
+        return notifyList;
+    }
+
+    @Override
     public boolean prepareAndCommit(Transaction transaction) {
         if (prepare(transaction)) {
             return commit(transaction);
@@ -335,6 +380,14 @@
         return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
     }
 
+    private Queue<byte[]> getQueue(String queueName) {
+        return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
+    }
+
+    private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
+        return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
+    }
+
     private boolean isUpdatePossible(DatabaseUpdate update) {
         Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
         switch (update.type()) {