Consistently ordered notification support for single partition scenario.

Change-Id: I6d959fafb879aa89885c2fb758aa73efd4b47cb0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 9e48051..4b96f3f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -21,23 +21,29 @@
 import net.kuujo.copycat.resource.internal.ResourceManager;
 import net.kuujo.copycat.state.internal.DefaultStateMachine;
 import net.kuujo.copycat.util.concurrent.Futures;
+import net.kuujo.copycat.util.function.TriConsumer;
 
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
+import com.google.common.collect.Sets;
+
 /**
  * Default database.
  */
 public class DefaultDatabase extends AbstractResource<Database> implements Database {
     private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
     private DatabaseProxy<String, byte[]> proxy;
+    private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
+    private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public DefaultDatabase(ResourceManager context) {
@@ -46,6 +52,14 @@
                 DatabaseState.class,
                 DefaultDatabaseState.class,
                 DefaultDatabase.class.getClassLoader());
+        this.stateMachine.addStartupTask(() -> {
+            stateMachine.registerWatcher(watcher);
+            return CompletableFuture.completedFuture(null);
+        });
+        this.stateMachine.addShutdownTask(() -> {
+            stateMachine.unregisterWatcher(watcher);
+            return CompletableFuture.completedFuture(null);
+        });
     }
 
     /**
@@ -209,4 +223,27 @@
         }
         return false;
     }
+
+    @Override
+    public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+        consumers.add(consumer);
+    }
+
+    @Override
+    public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+        consumers.remove(consumer);
+    }
+
+    @Override
+    public boolean hasChangeNotificationSupport() {
+        return true;
+    }
+
+    private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
+        @Override
+        public void accept(String name, Object input, Object output) {
+            StateMachineUpdate update = new StateMachineUpdate(name, input, output);
+            consumers.forEach(consumer -> consumer.accept(update));
+        }
+    }
 }