Consistently ordered notification support for single partition scenario.
Change-Id: I6d959fafb879aa89885c2fb758aa73efd4b47cb0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 9e48051..4b96f3f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -21,23 +21,29 @@
import net.kuujo.copycat.resource.internal.ResourceManager;
import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
+import net.kuujo.copycat.util.function.TriConsumer;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
+import com.google.common.collect.Sets;
+
/**
* Default database.
*/
public class DefaultDatabase extends AbstractResource<Database> implements Database {
private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
private DatabaseProxy<String, byte[]> proxy;
+ private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
+ private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
@SuppressWarnings({ "unchecked", "rawtypes" })
public DefaultDatabase(ResourceManager context) {
@@ -46,6 +52,14 @@
DatabaseState.class,
DefaultDatabaseState.class,
DefaultDatabase.class.getClassLoader());
+ this.stateMachine.addStartupTask(() -> {
+ stateMachine.registerWatcher(watcher);
+ return CompletableFuture.completedFuture(null);
+ });
+ this.stateMachine.addShutdownTask(() -> {
+ stateMachine.unregisterWatcher(watcher);
+ return CompletableFuture.completedFuture(null);
+ });
}
/**
@@ -209,4 +223,27 @@
}
return false;
}
+
+ @Override
+ public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+ consumers.add(consumer);
+ }
+
+ @Override
+ public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+ consumers.remove(consumer);
+ }
+
+ @Override
+ public boolean hasChangeNotificationSupport() {
+ return true;
+ }
+
+ private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
+ @Override
+ public void accept(String name, Object input, Object output) {
+ StateMachineUpdate update = new StateMachineUpdate(name, input, output);
+ consumers.forEach(consumer -> consumer.accept(update));
+ }
+ }
}