ONOS-2440: Simplify DistributedQueue implementation by leveraging state change notification support
Change-Id: Id0a48f07535d8b7e1d0f964bd1c0623ca81d4605
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 219b847..9d3505b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,7 +18,6 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
@@ -27,7 +26,6 @@
import java.util.stream.Collectors;
import java.util.Set;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -48,7 +46,6 @@
private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> maps;
private Map<String, Queue<byte[]>> queues;
- private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
/**
* This locks map has a structure similar to the "tables" map above and
@@ -85,11 +82,6 @@
queues = Maps.newConcurrentMap();
context.put("queues", queues);
}
- queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
- if (queueUpdateNotificationTargets == null) {
- queueUpdateNotificationTargets = Maps.newConcurrentMap();
- context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
- }
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
@@ -214,27 +206,17 @@
@Override
public byte[] queuePeek(String queueName) {
- Queue<byte[]> queue = getQueue(queueName);
- return queue.peek();
+ return getQueue(queueName).peek();
}
@Override
- public byte[] queuePop(String queueName, NodeId requestor) {
- Queue<byte[]> queue = getQueue(queueName);
- if (queue.size() == 0 && requestor != null) {
- getQueueUpdateNotificationTargets(queueName).add(requestor);
- return null;
- } else {
- return queue.remove();
- }
+ public byte[] queuePop(String queueName) {
+ return getQueue(queueName).poll();
}
@Override
- public Set<NodeId> queuePush(String queueName, byte[] entry) {
- getQueue(queueName).add(entry);
- Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
- getQueueUpdateNotificationTargets(queueName).clear();
- return notifyList;
+ public void queuePush(String queueName, byte[] entry) {
+ getQueue(queueName).offer(entry);
}
@Override
@@ -289,10 +271,6 @@
return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
}
- private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
- return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
- }
-
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
switch (update.type()) {