Consistently ordered notification support for single partition scenario.

Change-Id: I6d959fafb879aa89885c2fb758aa73efd4b47cb0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
index a04a592..d48e74e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -17,6 +17,8 @@
 package org.onosproject.store.consistent.impl;
 
 
+import java.util.function.Consumer;
+
 import net.kuujo.copycat.cluster.ClusterConfig;
 import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
 import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
@@ -81,4 +83,22 @@
       .addStartupTask(() -> coordinator.open().thenApply(v -> null))
       .addShutdownTask(coordinator::close);
   }
+
+  /**
+   * Tells whether the database supports change notifications.
+   * @return true if notifications are supported; false otherwise
+   */
+  boolean hasChangeNotificationSupport();
+
+  /**
+   * Registers a new consumer of StateMachineUpdates.
+   * @param consumer consumer to register
+   */
+  void registerConsumer(Consumer<StateMachineUpdate> consumer);
+
+  /**
+   * Unregisters a consumer of StateMachineUpdates.
+   * @param consumer consumer to unregister
+   */
+  void unregisterConsumer(Consumer<StateMachineUpdate> consumer);
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index f42857f..7bf30cc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -34,8 +34,12 @@
 import java.util.Set;
 
 import org.onlab.util.HexString;
+import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
 import org.onosproject.core.ApplicationId;
+
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
+
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEvent;
@@ -101,6 +105,17 @@
         this.readOnly = readOnly;
         this.purgeOnUninstall = purgeOnUninstall;
         this.eventPublisher = eventPublisher;
+        this.database.registerConsumer(update -> {
+            SharedExecutors.getSingleThreadExecutor().execute(() -> {
+                if (update.target() == MAP) {
+                    Result<UpdateResult<String, byte[]>> result = update.output();
+                    if (result.success() && result.value().mapName().equals(name)) {
+                        MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
+                        notifyLocalListeners(mapEvent);
+                    }
+                }
+            });
+        });
     }
 
     /**
@@ -322,7 +337,11 @@
                     value == null ? null : serializer.encode(value))
                 .thenApply(this::unwrapResult)
                 .thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
-                .whenComplete((r, e) -> notifyListeners(r != null ? r.toMapEvent() : null));
+                .whenComplete((r, e) -> {
+                    if (r != null && e == null && !database.hasChangeNotificationSupport()) {
+                        notifyListeners(r.toMapEvent());
+                    }
+                });
     }
 
     private <T> T unwrapResult(Result<T> result) {
@@ -363,7 +382,9 @@
     }
 
     protected void notifyLocalListeners(MapEvent<K, V> event) {
-        listeners.forEach(listener -> listener.event(event));
+        if (event != null) {
+            listeners.forEach(listener -> listener.event(event));
+        }
     }
 
     protected void notifyRemoteListeners(MapEvent<K, V> event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 9e48051..4b96f3f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -21,23 +21,29 @@
 import net.kuujo.copycat.resource.internal.ResourceManager;
 import net.kuujo.copycat.state.internal.DefaultStateMachine;
 import net.kuujo.copycat.util.concurrent.Futures;
+import net.kuujo.copycat.util.function.TriConsumer;
 
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
+import com.google.common.collect.Sets;
+
 /**
  * Default database.
  */
 public class DefaultDatabase extends AbstractResource<Database> implements Database {
     private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
     private DatabaseProxy<String, byte[]> proxy;
+    private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
+    private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public DefaultDatabase(ResourceManager context) {
@@ -46,6 +52,14 @@
                 DatabaseState.class,
                 DefaultDatabaseState.class,
                 DefaultDatabase.class.getClassLoader());
+        this.stateMachine.addStartupTask(() -> {
+            stateMachine.registerWatcher(watcher);
+            return CompletableFuture.completedFuture(null);
+        });
+        this.stateMachine.addShutdownTask(() -> {
+            stateMachine.unregisterWatcher(watcher);
+            return CompletableFuture.completedFuture(null);
+        });
     }
 
     /**
@@ -209,4 +223,27 @@
         }
         return false;
     }
+
+    @Override
+    public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+        consumers.add(consumer);
+    }
+
+    @Override
+    public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+        consumers.remove(consumer);
+    }
+
+    @Override
+    public boolean hasChangeNotificationSupport() {
+        return true;
+    }
+
+    private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
+        @Override
+        public void accept(String name, Object input, Object output) {
+            StateMachineUpdate update = new StateMachineUpdate(name, input, output);
+            consumers.forEach(consumer -> consumer.accept(update));
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index b0fa4fe..aa2d753 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.onosproject.cluster.NodeId;
@@ -363,4 +364,17 @@
     protected void setTransactionManager(TransactionManager transactionManager) {
         this.transactionManager = transactionManager;
     }
+
+    @Override
+    public boolean hasChangeNotificationSupport() {
+        return false;
+    }
+
+    @Override
+    public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+    }
+
+    @Override
+    public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
new file mode 100644
index 0000000..a17f317
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
@@ -0,0 +1,51 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Representation of a state machine update.
+ */
+public class StateMachineUpdate {
+
+    /**
+     * Target data structure type this update is for.
+     */
+    enum Target {
+        /**
+         * Update is for a map.
+         */
+        MAP,
+
+        /**
+         * Update is for a non-map data structure.
+         */
+        OTHER
+    }
+
+    private final String operationName;
+    private final Object input;
+    private final Object output;
+
+    public StateMachineUpdate(String operationName, Object input, Object output) {
+        this.operationName = operationName;
+        this.input = input;
+        this.output = output;
+    }
+
+    public Target target() {
+        // FIXME: This check is brittle
+        if (operationName.contains("mapUpdate")) {
+            return Target.MAP;
+        } else {
+            return Target.OTHER;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T input() {
+        return (T) input;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T output() {
+        return (T) output;
+    }
+}
\ No newline at end of file