ONOS-2440: Simplify DistributedQueue implementation by leveraging state change notification support
Change-Id: Id0a48f07535d8b7e1d0f964bd1c0623ca81d4605
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 1bccf2e..b7c3794 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,7 +16,6 @@
package org.onosproject.store.consistent.impl;
-import com.google.common.base.Charsets;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -49,8 +48,6 @@
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
-import static org.onlab.util.Tools.groupedThreads;
-
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationListener;
import org.onosproject.app.ApplicationService;
@@ -61,7 +58,6 @@
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
@@ -86,7 +82,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -112,8 +107,6 @@
private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
- protected static final MessageSubject QUEUE_UPDATED_TOPIC = new MessageSubject("distributed-queue-updated");
-
private ClusterCoordinator coordinator;
protected PartitionedDatabase partitionedDatabase;
protected Database inMemoryDatabase;
@@ -122,15 +115,12 @@
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
- private ExecutorService eventDispatcher;
- private ExecutorService queuePollExecutor;
private ApplicationListener appListener = new InternalApplicationListener();
private final Multimap<String, DefaultAsyncConsistentMap> maps =
Multimaps.synchronizedMultimap(ArrayListMultimap.create());
private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication =
Multimaps.synchronizedMultimap(ArrayListMultimap.create());
- private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -237,21 +227,6 @@
transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
partitionedDatabase.setTransactionManager(transactionManager);
- eventDispatcher = Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/manager", "map-event-dispatcher"));
-
- queuePollExecutor = Executors.newFixedThreadPool(4,
- groupedThreads("onos/store/manager", "queue-poll-handler"));
-
- clusterCommunicator.<String>addSubscriber(QUEUE_UPDATED_TOPIC,
- data -> new String(data, Charsets.UTF_8),
- name -> {
- DefaultDistributedQueue q = queues.get(name);
- if (q != null) {
- q.tryPoll();
- }
- },
- queuePollExecutor);
log.info("Started");
}
@@ -277,13 +252,10 @@
log.info("Successfully closed databases.");
}
});
- clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
maps.values().forEach(this::unregisterMap);
if (applicationService != null) {
applicationService.removeListener(appListener);
}
- eventDispatcher.shutdown();
- queuePollExecutor.shutdown();
log.info("Stopped");
}
@@ -467,13 +439,6 @@
}
}
- protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
- // TODO: Support multiple local instances of the same queue.
- if (queues.putIfAbsent(queue.name(), queue) != null) {
- throw new IllegalStateException("Queue by name " + queue.name() + " already exists");
- }
- }
-
private class InternalApplicationListener implements ApplicationListener {
@Override
public void event(ApplicationEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 08317b5..95f9e39 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -21,7 +21,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -168,17 +167,16 @@
* Inserts an entry into the queue.
* @param queueName queue name
* @param entry queue entry
- * @return set of nodes to notify about the queue update
+ * @return void future
*/
- CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry);
+ CompletableFuture<Void> queuePush(String queueName, byte[] entry);
/**
* Removes an entry from the queue if the queue is non-empty.
* @param queueName queue name
- * @param nodeId If the queue is empty the identifier of node to notify when an entry becomes available
- * @return entry. Can be null if queue is empty
+ * @return entry future. Can be completed with null if queue is empty
*/
- CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId);
+ CompletableFuture<byte[]> queuePop(String queueName);
/**
* Returns but does not remove an entry from the queue.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 8b6db1e..b3dd1c4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -21,7 +21,6 @@
import java.util.Map.Entry;
import java.util.Set;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -93,10 +92,10 @@
byte[] queuePeek(String queueName);
@Command
- byte[] queuePop(String queueName, NodeId requestor);
+ byte[] queuePop(String queueName);
@Command
- Set<NodeId> queuePush(String queueName, byte[] entry);
+ void queuePush(String queueName, byte[] entry);
@Query
Long counterGet(String counterName);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c9311c9..9082ba6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -47,7 +47,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP_UPDATE;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -122,7 +122,7 @@
this.purgeOnUninstall = purgeOnUninstall;
this.database.registerConsumer(update -> {
SharedExecutors.getSingleThreadExecutor().execute(() -> {
- if (update.target() == MAP) {
+ if (update.target() == MAP_UPDATE) {
Result<UpdateResult<String, byte[]>> result = update.output();
if (result.success() && result.value().mapName().equals(name)) {
MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index ba0b1be..4d9776e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -30,7 +30,6 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -159,13 +158,13 @@
}
@Override
- public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+ public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
return checkOpen(() -> proxy.queuePush(queueName, entry));
}
@Override
- public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
- return checkOpen(() -> proxy.queuePop(queueName, nodeId));
+ public CompletableFuture<byte[]> queuePop(String queueName) {
+ return checkOpen(() -> proxy.queuePop(queueName));
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 219b847..9d3505b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,7 +18,6 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
@@ -27,7 +26,6 @@
import java.util.stream.Collectors;
import java.util.Set;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -48,7 +46,6 @@
private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> maps;
private Map<String, Queue<byte[]>> queues;
- private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
/**
* This locks map has a structure similar to the "tables" map above and
@@ -85,11 +82,6 @@
queues = Maps.newConcurrentMap();
context.put("queues", queues);
}
- queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
- if (queueUpdateNotificationTargets == null) {
- queueUpdateNotificationTargets = Maps.newConcurrentMap();
- context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
- }
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
@@ -214,27 +206,17 @@
@Override
public byte[] queuePeek(String queueName) {
- Queue<byte[]> queue = getQueue(queueName);
- return queue.peek();
+ return getQueue(queueName).peek();
}
@Override
- public byte[] queuePop(String queueName, NodeId requestor) {
- Queue<byte[]> queue = getQueue(queueName);
- if (queue.size() == 0 && requestor != null) {
- getQueueUpdateNotificationTargets(queueName).add(requestor);
- return null;
- } else {
- return queue.remove();
- }
+ public byte[] queuePop(String queueName) {
+ return getQueue(queueName).poll();
}
@Override
- public Set<NodeId> queuePush(String queueName, byte[] entry) {
- getQueue(queueName).add(entry);
- Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
- getQueueUpdateNotificationTargets(queueName).clear();
- return notifyList;
+ public void queuePush(String queueName, byte[] entry) {
+ getQueue(queueName).offer(entry);
}
@Override
@@ -289,10 +271,6 @@
return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
}
- private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
- return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
- }
-
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
switch (update.type()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
index c27774a..e4b2641 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -17,15 +17,16 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
-import org.onosproject.cluster.NodeId;
+
+import org.onlab.util.SharedExecutors;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
/**
* DistributedQueue implementation that provides FIFO ordering semantics.
@@ -37,9 +38,7 @@
private final String name;
private final Database database;
private final Serializer serializer;
- private final NodeId localNodeId;
private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
- private final Consumer<Set<NodeId>> notifyConsumers;
private static final String PRIMITIVE_NAME = "distributedQueue";
private static final String SIZE = "size";
@@ -53,66 +52,59 @@
public DefaultDistributedQueue(String name,
Database database,
Serializer serializer,
- NodeId localNodeId,
- boolean meteringEnabled,
- Consumer<Set<NodeId>> notifyConsumers) {
+ boolean meteringEnabled) {
this.name = checkNotNull(name, "queue name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
- this.localNodeId = localNodeId;
- this.notifyConsumers = notifyConsumers;
this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
-
+ this.database.registerConsumer(update -> {
+ SharedExecutors.getSingleThreadExecutor().execute(() -> {
+ if (update.target() == QUEUE_PUSH) {
+ List<Object> input = update.input();
+ String queueName = (String) input.get(0);
+ if (queueName.equals(name)) {
+ tryPoll();
+ }
+ }
+ });
+ });
}
@Override
public long size() {
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
- try {
- return Futures.getUnchecked(database.queueSize(name));
- } finally {
- timer.stop();
- }
+ return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop()));
}
@Override
public void push(E entry) {
+ checkNotNull(entry, ERROR_NULL_ENTRY);
final MeteringAgent.Context timer = monitor.startTimer(PUSH);
- try {
- checkNotNull(entry, ERROR_NULL_ENTRY);
- Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
- .thenAccept(notifyConsumers)
- .thenApply(v -> null));
- } finally {
- timer.stop();
- }
+ Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
+ .whenComplete((r, e) -> timer.stop()));
}
@Override
public CompletableFuture<E> pop() {
final MeteringAgent.Context timer = monitor.startTimer(POP);
- return database.queuePop(name, localNodeId)
+ return database.queuePop(name)
+ .whenComplete((r, e) -> timer.stop())
.thenCompose(v -> {
if (v != null) {
return CompletableFuture.completedFuture(serializer.decode(v));
- } else {
- CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
- pendingFutures.add(newPendingFuture);
- return newPendingFuture;
}
- })
- .whenComplete((r, e) -> timer.stop());
+ CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
+ pendingFutures.add(newPendingFuture);
+ return newPendingFuture;
+ });
}
@Override
public E peek() {
final MeteringAgent.Context timer = monitor.startTimer(PEEK);
- try {
- return Futures.getUnchecked(database.queuePeek(name)
- .thenApply(v -> v != null ? serializer.decode(v) : null));
- } finally {
- timer.stop();
- }
+ return Futures.getUnchecked(database.queuePeek(name)
+ .thenApply(v -> v != null ? serializer.<E>decode(v) : null)
+ .whenComplete((r, e) -> timer.stop()));
}
public String name() {
@@ -122,7 +114,7 @@
protected void tryPoll() {
Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
for (CompletableFuture<E> future : pendingFutures) {
- E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
+ E entry = Futures.getUnchecked(database.queuePop(name)
.thenApply(v -> v != null ? serializer.decode(v) : null));
if (entry != null) {
future.complete(entry);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
index b463aff..d6654e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
@@ -15,15 +15,10 @@
*/
package org.onosproject.store.consistent.impl;
-import com.google.common.base.Charsets;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.Serializer;
-import java.util.Set;
-import java.util.function.Consumer;
-
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -40,8 +35,7 @@
private final DatabaseManager databaseManager;
private boolean metering = true;
- public DefaultDistributedQueueBuilder(
- DatabaseManager databaseManager) {
+ public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) {
this.databaseManager = databaseManager;
}
@@ -78,18 +72,10 @@
@Override
public DistributedQueue<E> build() {
checkState(validInputs());
- Consumer<Set<NodeId>> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name,
- DatabaseManager.QUEUE_UPDATED_TOPIC,
- s -> s.getBytes(Charsets.UTF_8),
- nodes);
- DefaultDistributedQueue<E> queue = new DefaultDistributedQueue<>(
+ return new DefaultDistributedQueue<>(
name,
persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
serializer,
- databaseManager.localNodeId,
- metering,
- notifyOthers);
- databaseManager.registerQueue(queue);
- return queue;
+ metering);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 09b3f59..a294681 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -28,7 +28,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -229,15 +228,15 @@
}
@Override
- public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+ public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
}
@Override
- public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
+ public CompletableFuture<byte[]> queuePop(String queueName) {
checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
+ return partitioner.getPartition(queueName, queueName).queuePop(queueName);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
index 9b064b0..72356d0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
@@ -29,7 +29,7 @@
/**
* Update is for a map.
*/
- MAP,
+ MAP_UPDATE,
/**
* Update is a transaction commit.
@@ -37,7 +37,12 @@
TX_COMMIT,
/**
- * Update is for a non-map data structure.
+ * Update is a queue push.
+ */
+ QUEUE_PUSH,
+
+ /**
+ * Update is for some other operation.
*/
OTHER
}
@@ -55,9 +60,11 @@
public Target target() {
// FIXME: This check is brittle
if (operationName.contains("mapUpdate")) {
- return Target.MAP;
+ return Target.MAP_UPDATE;
} else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
return Target.TX_COMMIT;
+ } else if (operationName.contains("queuePush")) {
+ return Target.QUEUE_PUSH;
} else {
return Target.OTHER;
}