ONOS-2440: Simplify DistributedQueue implementation by leveraging state change notification support

Change-Id: Id0a48f07535d8b7e1d0f964bd1c0623ca81d4605
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 219b847..9d3505b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,7 +18,6 @@
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -27,7 +26,6 @@
 import java.util.stream.Collectors;
 import java.util.Set;
 
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
@@ -48,7 +46,6 @@
     private Map<String, AtomicLong> counters;
     private Map<String, Map<String, Versioned<byte[]>>> maps;
     private Map<String, Queue<byte[]>> queues;
-    private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
 
     /**
      * This locks map has a structure similar to the "tables" map above and
@@ -85,11 +82,6 @@
             queues = Maps.newConcurrentMap();
             context.put("queues", queues);
         }
-        queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
-        if (queueUpdateNotificationTargets == null) {
-            queueUpdateNotificationTargets = Maps.newConcurrentMap();
-            context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
-        }
         nextVersion = context.get("nextVersion");
         if (nextVersion == null) {
             nextVersion = new Long(0);
@@ -214,27 +206,17 @@
 
     @Override
     public byte[] queuePeek(String queueName) {
-        Queue<byte[]> queue = getQueue(queueName);
-        return queue.peek();
+        return getQueue(queueName).peek();
     }
 
     @Override
-    public byte[] queuePop(String queueName, NodeId requestor) {
-        Queue<byte[]> queue = getQueue(queueName);
-        if (queue.size() == 0 && requestor != null) {
-            getQueueUpdateNotificationTargets(queueName).add(requestor);
-            return null;
-        } else {
-            return queue.remove();
-        }
+    public byte[] queuePop(String queueName) {
+        return getQueue(queueName).poll();
     }
 
     @Override
-    public Set<NodeId> queuePush(String queueName, byte[] entry) {
-        getQueue(queueName).add(entry);
-        Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
-        getQueueUpdateNotificationTargets(queueName).clear();
-        return notifyList;
+    public void queuePush(String queueName, byte[] entry) {
+        getQueue(queueName).offer(entry);
     }
 
     @Override
@@ -289,10 +271,6 @@
         return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
     }
 
-    private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
-        return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
-    }
-
     private boolean isUpdatePossible(DatabaseUpdate update) {
         Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
         switch (update.type()) {