Consistently ordered notification support for single partition scenario.
Change-Id: I6d959fafb879aa89885c2fb758aa73efd4b47cb0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
index a04a592..d48e74e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -17,6 +17,8 @@
package org.onosproject.store.consistent.impl;
+import java.util.function.Consumer;
+
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
@@ -81,4 +83,22 @@
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}
+
+ /**
+ * Tells whether the database supports change notifications.
+ * @return true if notifications are supported; false otherwise
+ */
+ boolean hasChangeNotificationSupport();
+
+ /**
+ * Registers a new consumer of StateMachineUpdates.
+ * @param consumer consumer to register
+ */
+ void registerConsumer(Consumer<StateMachineUpdate> consumer);
+
+ /**
+ * Unregisters a consumer of StateMachineUpdates.
+ * @param consumer consumer to unregister
+ */
+ void unregisterConsumer(Consumer<StateMachineUpdate> consumer);
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index f42857f..7bf30cc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -34,8 +34,12 @@
import java.util.Set;
import org.onlab.util.HexString;
+import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
+
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
+
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
@@ -101,6 +105,17 @@
this.readOnly = readOnly;
this.purgeOnUninstall = purgeOnUninstall;
this.eventPublisher = eventPublisher;
+ this.database.registerConsumer(update -> {
+ SharedExecutors.getSingleThreadExecutor().execute(() -> {
+ if (update.target() == MAP) {
+ Result<UpdateResult<String, byte[]>> result = update.output();
+ if (result.success() && result.value().mapName().equals(name)) {
+ MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
+ notifyLocalListeners(mapEvent);
+ }
+ }
+ });
+ });
}
/**
@@ -322,7 +337,11 @@
value == null ? null : serializer.encode(value))
.thenApply(this::unwrapResult)
.thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
- .whenComplete((r, e) -> notifyListeners(r != null ? r.toMapEvent() : null));
+ .whenComplete((r, e) -> {
+ if (r != null && e == null && !database.hasChangeNotificationSupport()) {
+ notifyListeners(r.toMapEvent());
+ }
+ });
}
private <T> T unwrapResult(Result<T> result) {
@@ -363,7 +382,9 @@
}
protected void notifyLocalListeners(MapEvent<K, V> event) {
- listeners.forEach(listener -> listener.event(event));
+ if (event != null) {
+ listeners.forEach(listener -> listener.event(event));
+ }
}
protected void notifyRemoteListeners(MapEvent<K, V> event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 9e48051..4b96f3f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -21,23 +21,29 @@
import net.kuujo.copycat.resource.internal.ResourceManager;
import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
+import net.kuujo.copycat.util.function.TriConsumer;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
+import com.google.common.collect.Sets;
+
/**
* Default database.
*/
public class DefaultDatabase extends AbstractResource<Database> implements Database {
private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
private DatabaseProxy<String, byte[]> proxy;
+ private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
+ private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
@SuppressWarnings({ "unchecked", "rawtypes" })
public DefaultDatabase(ResourceManager context) {
@@ -46,6 +52,14 @@
DatabaseState.class,
DefaultDatabaseState.class,
DefaultDatabase.class.getClassLoader());
+ this.stateMachine.addStartupTask(() -> {
+ stateMachine.registerWatcher(watcher);
+ return CompletableFuture.completedFuture(null);
+ });
+ this.stateMachine.addShutdownTask(() -> {
+ stateMachine.unregisterWatcher(watcher);
+ return CompletableFuture.completedFuture(null);
+ });
}
/**
@@ -209,4 +223,27 @@
}
return false;
}
+
+ @Override
+ public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+ consumers.add(consumer);
+ }
+
+ @Override
+ public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+ consumers.remove(consumer);
+ }
+
+ @Override
+ public boolean hasChangeNotificationSupport() {
+ return true;
+ }
+
+ private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
+ @Override
+ public void accept(String name, Object input, Object output) {
+ StateMachineUpdate update = new StateMachineUpdate(name, input, output);
+ consumers.forEach(consumer -> consumer.accept(update));
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index b0fa4fe..aa2d753 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -25,6 +25,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.onosproject.cluster.NodeId;
@@ -363,4 +364,17 @@
protected void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
+
+ @Override
+ public boolean hasChangeNotificationSupport() {
+ return false;
+ }
+
+ @Override
+ public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+ }
+
+ @Override
+ public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
new file mode 100644
index 0000000..a17f317
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
@@ -0,0 +1,51 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Representation of a state machine update.
+ */
+public class StateMachineUpdate {
+
+ /**
+ * Target data structure type this update is for.
+ */
+ enum Target {
+ /**
+ * Update is for a map.
+ */
+ MAP,
+
+ /**
+ * Update is for a non-map data structure.
+ */
+ OTHER
+ }
+
+ private final String operationName;
+ private final Object input;
+ private final Object output;
+
+ public StateMachineUpdate(String operationName, Object input, Object output) {
+ this.operationName = operationName;
+ this.input = input;
+ this.output = output;
+ }
+
+ public Target target() {
+ // FIXME: This check is brittle
+ if (operationName.contains("mapUpdate")) {
+ return Target.MAP;
+ } else {
+ return Target.OTHER;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T input() {
+ return (T) input;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T output() {
+ return (T) output;
+ }
+}
\ No newline at end of file