Notification support for Consistent datastructures (ConsitentMap and DistributedSet)

Change-Id: If74cdc411c79c42c7643420e6369cf656849bb6a
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 8a88ed6..1779256 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -288,4 +288,19 @@
      * @return optional updated value. Will be empty if update did not happen.
      */
     CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
+
+    /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     */
+    void addListener(MapEventListener<K, V> listener);
+
+    /**
+     * Unregisters the specified listener such that it will no longer
+     * receive map change notifications.
+     *
+     * @param listener listener to unregister
+     */
+    void removeListener(MapEventListener<K, V> listener);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 2893509..59f72a9 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -289,4 +289,19 @@
      * @return optional new value. Will be empty if replace did not happen
      */
     Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
+
+    /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     */
+    void addListener(MapEventListener<K, V> listener);
+
+    /**
+     * Unregisters the specified listener such that it will no longer
+     * receive map change notifications.
+     *
+     * @param listener listener to unregister
+     */
+    void removeListener(MapEventListener<K, V> listener);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
new file mode 100644
index 0000000..4c9116a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
@@ -0,0 +1,26 @@
+package org.onosproject.store.service;
+
+import java.util.Set;
+
+/**
+ * A distributed collection designed for holding unique elements.
+ *
+ * @param <E> set entry type
+ */
+public interface DistributedSet<E> extends Set<E> {
+
+    /**
+     * Registers the specified listener to be notified whenever
+     * the set is updated.
+     *
+     * @param listener listener to notify about set update events
+     */
+    void addListener(SetEventListener<E> listener);
+
+    /**
+     * Unregisters the specified listener.
+     *
+     * @param listener listener to unregister.
+     */
+    void removeListener(SetEventListener<E> listener);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
similarity index 60%
rename from core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
rename to core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
index 3f28065f..c5608ce 100644
--- a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
@@ -15,14 +15,12 @@
  */
 package org.onosproject.store.service;
 
-import java.util.Set;
-
 /**
  * Builder for distributed set.
  *
  * @param <E> type set elements.
  */
-public interface SetBuilder<E> {
+public interface DistributedSetBuilder<E> {
 
     /**
      * Sets the name of the set.
@@ -34,9 +32,9 @@
      * </p>
      *
      * @param name name of the set
-     * @return this SetBuilder
+     * @return this DistributedSetBuilder
      */
-    SetBuilder<E> withName(String name);
+    DistributedSetBuilder<E> withName(String name);
 
     /**
      * Sets a serializer that can be used to serialize
@@ -48,18 +46,36 @@
      * </p>
      *
      * @param serializer serializer
-     * @return this SetBuilder
+     * @return this DistributedSetBuilder
      */
-    SetBuilder<E> withSerializer(Serializer serializer);
+    DistributedSetBuilder<E> withSerializer(Serializer serializer);
 
     /**
      * Disables set updates.
      * <p>
      * Attempt to update the built set will throw {@code UnsupportedOperationException}.
      *
-     * @return this SetBuilder
+     * @return this DistributedSetBuilder
      */
-    SetBuilder<E> withUpdatesDisabled();
+    DistributedSetBuilder<E> withUpdatesDisabled();
+
+    /**
+     * Disables distribution of set entries across multiple database partitions.
+     * <p>
+     * When partitioning is disabled, the returned set will have a single partition
+     * that spans the entire cluster. Furthermore, the changes made to the set are
+     * ephemeral and do not survive a full cluster restart.
+     * </p>
+     * <p>
+     * Disabling partitions is more appropriate when the returned set is used for
+     * simple coordination activities and not for long term data persistence.
+     * </p>
+     * <p>
+     * Note: By default partitions are enabled and entries in the set are durable.
+     * </p>
+     * @return this DistributedSetBuilder
+     */
+    DistributedSetBuilder<E> withPartitionsDisabled();
 
     /**
      * Builds an set based on the configuration options
@@ -68,5 +84,5 @@
      * @return new set
      * @throws java.lang.RuntimeException if a mandatory parameter is missing
      */
-    Set<E> build();
+    DistributedSet<E> build();
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
new file mode 100644
index 0000000..74225ac
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
@@ -0,0 +1,120 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a ConsistentMap update notification.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MapEvent<K, V> {
+
+    /**
+     * MapEvent type.
+     */
+    public enum Type {
+        /**
+         * Entry inserted into the map.
+         */
+        INSERT,
+
+        /**
+         * Existing map entry updated.
+         */
+        UPDATE,
+
+        /**
+         * Entry removed from map.
+         */
+        REMOVE
+    }
+
+    private final String name;
+    private final Type type;
+    private final K key;
+    private final Versioned<V> value;
+
+    /**
+     * Creates a new event object.
+     *
+     * @param name map name
+     * @param type the type of the event
+     * @param key the key the event concerns
+     * @param value the value related to the key, or null for remove events
+     */
+    public MapEvent(String name, Type type, K key, Versioned<V> value) {
+        this.name = name;
+        this.type = type;
+        this.key = key;
+        this.value = value;
+    }
+
+    /**
+     * Returns the map name.
+     *
+     * @return name of map
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the type of the event.
+     *
+     * @return the type of the event
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the key this event concerns.
+     *
+     * @return the key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the value associated with this event. If type is REMOVE,
+     * this is the value that was removed. If type is INSERT/UPDATE, this is
+     * the new value.
+     *
+     * @return the value
+     */
+    public Versioned<V> value() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof MapEvent)) {
+            return false;
+        }
+
+        MapEvent<K, V> that = (MapEvent) o;
+        return Objects.equals(this.name, that.name) &&
+                Objects.equals(this.type, that.type) &&
+                Objects.equals(this.key, that.key) &&
+                Objects.equals(this.value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, key, value);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("name", name)
+                .add("type", type)
+                .add("key", key)
+                .add("value", value)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
new file mode 100644
index 0000000..2856306
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a ConsitentMap.
+ */
+public interface MapEventListener<K, V> {
+    /**
+     * Reacts to the specified event.
+     *
+     * @param event the event
+     */
+    void event(MapEvent<K, V> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Serializer.java b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
index 5e1b84e..0b00a68 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Serializer.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
@@ -16,8 +16,13 @@
 
 package org.onosproject.store.service;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.onlab.util.KryoNamespace;
 
+import com.google.common.collect.Lists;
+
 /**
  * Interface for serialization for store artifacts.
  */
@@ -45,16 +50,32 @@
      * @return Serializer instance
      */
     static Serializer using(KryoNamespace kryo) {
+        return using(Arrays.asList(kryo));
+    }
+
+    static Serializer using(List<KryoNamespace> namespaces, Class<?>... classes) {
+        KryoNamespace.Builder builder = new KryoNamespace.Builder();
+        namespaces.forEach(builder::register);
+        Lists.newArrayList(classes).forEach(builder::register);
+        builder.register(MapEvent.class, MapEvent.Type.class);
+        KryoNamespace namespace = builder.build();
         return new Serializer() {
             @Override
             public <T> byte[] encode(T object) {
-                return kryo.serialize(object);
+                return namespace.serialize(object);
             }
 
             @Override
             public <T> T decode(byte[] bytes) {
-                return kryo.deserialize(bytes);
+                return namespace.deserialize(bytes);
             }
         };
     }
+
+    static Serializer forTypes(Class<?>... classes) {
+        return using(KryoNamespace.newBuilder()
+                                  .register(classes)
+                                  .register(MapEvent.class, MapEvent.Type.class)
+                                  .build());
+    }
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEvent.java b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
new file mode 100644
index 0000000..1296a8b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
@@ -0,0 +1,98 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a DistributedSet update notification.
+ *
+ * @param <E> element type
+ */
+public class SetEvent<E> {
+
+    /**
+     * SetEvent type.
+     */
+    public enum Type {
+        /**
+         * Entry added to the set.
+         */
+        ADD,
+
+        /**
+         * Entry removed from the set.
+         */
+        REMOVE
+    }
+
+    private final String name;
+    private final Type type;
+    private final E entry;
+
+    /**
+     * Creates a new event object.
+     *
+     * @param name set name
+     * @param type the type of the event
+     * @param entry the entry the event concerns
+     */
+    public SetEvent(String name, Type type, E entry) {
+        this.name = name;
+        this.type = type;
+        this.entry = entry;
+    }
+
+    /**
+     * Returns the set name.
+     *
+     * @return name of set
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the type of the event.
+     *
+     * @return the type of the event
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the entry this event concerns.
+     *
+     * @return the entry
+     */
+    public E entry() {
+        return entry;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof SetEvent)) {
+            return false;
+        }
+
+        SetEvent<E> that = (SetEvent) o;
+        return Objects.equals(this.name, that.name) &&
+                Objects.equals(this.type, that.type) &&
+                Objects.equals(this.entry, that.entry);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, entry);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("name", name)
+                .add("type", type)
+                .add("entry", entry)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
new file mode 100644
index 0000000..c9ffea1
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a DistributedSet.
+ */
+public interface SetEventListener<E> {
+    /**
+     * Reacts to the specified event.
+     *
+     * @param event the event
+     */
+    void event(SetEvent<E> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index b836ce8..55a07ed 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -53,7 +53,7 @@
      * @param <E> set element type
      * @return builder for an distributed set
      */
-    <E> SetBuilder<E> setBuilder();
+    <E> DistributedSetBuilder<E> setBuilder();
 
     /**
      * Creates a new AtomicCounterBuilder.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 47031b0..a222627 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -42,19 +42,24 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+
+import static org.onlab.util.Tools.groupedThreads;
+
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
 import org.onosproject.store.cluster.impl.NodeInfo;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapInfo;
 import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.store.service.SetBuilder;
+import org.onosproject.store.service.DistributedSetBuilder;
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Transaction;
@@ -69,6 +74,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -93,12 +99,16 @@
     private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
 
     private ClusterCoordinator coordinator;
-    private PartitionedDatabase partitionedDatabase;
-    private Database inMemoryDatabase;
+    protected PartitionedDatabase partitionedDatabase;
+    protected Database inMemoryDatabase;
 
     private TransactionManager transactionManager;
     private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
 
+    private ExecutorService eventDispatcher;
+
+    private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
@@ -187,7 +197,10 @@
 
         Futures.getUnchecked(status);
 
-        transactionManager = new TransactionManager(partitionedDatabase);
+        transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
+
+        eventDispatcher = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/manager", "map-event-dispatcher"));
         log.info("Started");
     }
 
@@ -213,13 +226,14 @@
                     log.info("Successfully closed databases.");
                 }
             });
+        maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
+        eventDispatcher.shutdown();
         log.info("Stopped");
     }
 
     @Override
     public TransactionContextBuilder transactionContextBuilder() {
-        return new DefaultTransactionContextBuilder(
-                inMemoryDatabase, partitionedDatabase, transactionIdGenerator.getNewId());
+        return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
     }
 
     @Override
@@ -296,12 +310,12 @@
 
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
-        return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase);
+        return new DefaultConsistentMapBuilder<>(this);
     }
 
     @Override
-    public <E> SetBuilder<E> setBuilder() {
-        return new DefaultSetBuilder<>(partitionedDatabase);
+    public <E> DistributedSetBuilder<E> setBuilder() {
+        return new DefaultDistributedSetBuilder<>(this);
     }
 
     @Override
@@ -370,4 +384,20 @@
     public void redriveTransactions() {
         getTransactions().stream().forEach(transactionManager::execute);
     }
+
+    protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
+        // TODO: Support different local instances of the same map.
+        if (!maps.add(map)) {
+            throw new IllegalStateException("Map by name " + map.name() + " already exists");
+        }
+
+        clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
+                map.serializer()::decode,
+                map::notifyLocalListeners,
+                eventDispatcher);
+    }
+
+    protected static MessageSubject mapUpdatesSubject(String mapName) {
+        return new MessageSubject(mapName + "-map-updates");
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 3fb6f4a..4bc93bc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -17,6 +17,7 @@
 package org.onosproject.store.consistent.impl;
 
 import static com.google.common.base.Preconditions.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Collection;
 import java.util.Map;
@@ -24,8 +25,10 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -36,8 +39,11 @@
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -56,6 +62,11 @@
     private final Database database;
     private final Serializer serializer;
     private final boolean readOnly;
+    private final Consumer<MapEvent<K, V>> eventPublisher;
+
+    private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
+
+    private final Logger log = getLogger(getClass());
 
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
@@ -77,11 +88,29 @@
     public DefaultAsyncConsistentMap(String name,
             Database database,
             Serializer serializer,
-            boolean readOnly) {
+            boolean readOnly,
+            Consumer<MapEvent<K, V>> eventPublisher) {
         this.name = checkNotNull(name, "map name cannot be null");
         this.database = checkNotNull(database, "database cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
         this.readOnly = readOnly;
+        this.eventPublisher = eventPublisher;
+    }
+
+    /**
+     * Returns this map name.
+     * @return map name
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the serializer for map entries.
+     * @return map entry serializer
+     */
+    public Serializer serializer() {
+        return serializer;
     }
 
     @Override
@@ -139,6 +168,7 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(condition, "predicate function cannot be null");
         checkNotNull(remappingFunction, "Remapping function cannot be null");
+        AtomicReference<MapEvent<K, V>> mapEvent = new AtomicReference<>();
         return get(key).thenCompose(r1 -> {
             V existingValue = r1 == null ? null : r1.value();
             // if the condition evaluates to false, return existing value.
@@ -160,6 +190,7 @@
                 if (r1 != null) {
                     return remove(key, r1.version()).thenApply(result -> {
                         if (result) {
+                            mapEvent.set(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r1));
                             return null;
                         } else {
                             throw new ConsistentMapException.ConcurrentModification();
@@ -174,6 +205,7 @@
                     return replaceAndGet(key, r1.version(), computedValue.get())
                             .thenApply(v -> {
                                 if (v.isPresent()) {
+                                    mapEvent.set(new MapEvent<>(name, MapEvent.Type.UPDATE, key, v.get()));
                                     return v.get();
                                 } else {
                                     throw new ConsistentMapException.ConcurrentModification();
@@ -184,12 +216,13 @@
                         if (!result.isPresent()) {
                             throw new ConsistentMapException.ConcurrentModification();
                         } else {
+                            mapEvent.set(new MapEvent<>(name, MapEvent.Type.INSERT, key, result.get()));
                             return result.get();
                         }
                     });
                 }
             }
-        });
+        }).whenComplete((result, error) -> notifyListeners(mapEvent.get()));
     }
 
     @Override
@@ -370,4 +403,35 @@
             throw new UnsupportedOperationException();
         }
     }
+
+    @Override
+    public void addListener(MapEventListener<K, V> listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(MapEventListener<K, V> listener) {
+        listeners.remove(listener);
+    }
+
+    protected void notifyListeners(MapEvent<K, V> event) {
+        try {
+            if (event != null) {
+                notifyLocalListeners(event);
+                notifyRemoteListeners(event);
+            }
+        } catch (Exception e) {
+            log.warn("Failure notifying listeners about {}", event, e);
+        }
+    }
+
+    protected void notifyLocalListeners(MapEvent<K, V> event) {
+        listeners.forEach(listener -> listener.event(event));
+    }
+
+    protected void notifyRemoteListeners(MapEvent<K, V> event) {
+        if (eventPublisher != null) {
+            eventPublisher.accept(event);
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index b85dfa2..7995d8f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -28,10 +28,9 @@
 import java.util.function.Predicate;
 import java.util.Set;
 
-import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -45,13 +44,14 @@
 
     private static final int OPERATION_TIMEOUT_MILLIS = 5000;
 
-    private final AsyncConsistentMap<K, V> asyncMap;
+    private final DefaultAsyncConsistentMap<K, V> asyncMap;
 
-    public DefaultConsistentMap(String name,
-            Database database,
-            Serializer serializer,
-            boolean readOnly) {
-        asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer, readOnly);
+    public String name() {
+        return asyncMap.name();
+    }
+
+    public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
+        this.asyncMap = asyncMap;
     }
 
     @Override
@@ -190,4 +190,14 @@
             }
         }
     }
+
+    @Override
+    public void addListener(MapEventListener<K, V> listener) {
+        asyncMap.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(MapEventListener<K, V> listener) {
+        asyncMap.addListener(listener);
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
index 534b59f..24db74f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
@@ -6,6 +6,7 @@
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -20,12 +21,10 @@
     private String name;
     private boolean partitionsEnabled = true;
     private boolean readOnly = false;
-    private final Database partitionedDatabase;
-    private final Database inMemoryDatabase;
+    private final DatabaseManager manager;
 
-    public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
-        this.inMemoryDatabase = inMemoryDatabase;
-        this.partitionedDatabase = partitionedDatabase;
+    public DefaultConsistentMapBuilder(DatabaseManager manager) {
+        this.manager = manager;
     }
 
     @Override
@@ -60,21 +59,25 @@
 
     @Override
     public ConsistentMap<K, V> build() {
-        checkState(validInputs());
-        return new DefaultConsistentMap<>(
-                name,
-                partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
-                serializer,
-                readOnly);
+        return new DefaultConsistentMap<>(buildAndRegisterMap());
     }
 
     @Override
     public AsyncConsistentMap<K, V> buildAsyncMap() {
+        return buildAndRegisterMap();
+    }
+
+    private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
         checkState(validInputs());
-        return new DefaultAsyncConsistentMap<>(
+        DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
                 name,
-                partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
+                partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
                 serializer,
-                readOnly);
+                readOnly,
+                event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
+                        DatabaseManager.mapUpdatesSubject(name),
+                        serializer::encode));
+        manager.registerMap(asyncMap);
+        return asyncMap;
     }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index adc9dcd..ba5ff95 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -17,11 +17,17 @@
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 
 import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -29,12 +35,15 @@
 
  * @param <E> set element type
  */
-public class DefaultDistributedSet<E> implements Set<E> {
+public class DefaultDistributedSet<E> implements DistributedSet<E> {
 
+    private final String name;
     private final ConsistentMap<E, Boolean> backingMap;
+    private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
 
-    public DefaultDistributedSet(String name, Database database, Serializer serializer, boolean readOnly) {
-        backingMap = new DefaultConsistentMap<>(name, database, serializer, readOnly);
+    public DefaultDistributedSet(String name, ConsistentMap<E, Boolean> backingMap) {
+        this.name = name;
+        this.backingMap = backingMap;
     }
 
     @Override
@@ -76,7 +85,7 @@
     @SuppressWarnings("unchecked")
     @Override
     public boolean remove(Object o) {
-        return backingMap.remove((E) o, true);
+        return backingMap.remove((E) o) != null;
     }
 
     @Override
@@ -119,4 +128,26 @@
     public void clear() {
         backingMap.clear();
     }
+
+    @Override
+    public void addListener(SetEventListener<E> listener) {
+        MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
+            if (mapEvent.type() == MapEvent.Type.INSERT) {
+                listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
+            } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
+                listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
+            }
+        };
+        if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
+            backingMap.addListener(mapEventListener);
+        }
+    }
+
+    @Override
+    public void removeListener(SetEventListener<E> listener) {
+        MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
+        if (mapEventListener != null) {
+            backingMap.removeListener(mapEventListener);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
new file mode 100644
index 0000000..57ec232
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DistributedSetBuilder;
+
+/**
+ * Default distributed set builder.
+ *
+ * @param <E> type for set elements
+ */
+public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> {
+
+    private String name;
+    private ConsistentMapBuilder<E, Boolean>  mapBuilder;
+
+    public DefaultDistributedSetBuilder(DatabaseManager manager) {
+        this.mapBuilder = manager.consistentMapBuilder();
+    }
+
+    @Override
+    public DistributedSetBuilder<E> withName(String name) {
+        mapBuilder.withName(name);
+        this.name = name;
+        return this;
+    }
+
+    @Override
+    public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
+        mapBuilder.withSerializer(serializer);
+        return this;
+    }
+
+    @Override
+    public DistributedSetBuilder<E> withUpdatesDisabled() {
+        mapBuilder.withUpdatesDisabled();
+        return this;
+    }
+
+    @Override
+    public DistributedSetBuilder<E> withPartitionsDisabled() {
+        mapBuilder.withPartitionsDisabled();
+        return this;
+    }
+
+    @Override
+    public DistributedSet<E> build() {
+        return new DefaultDistributedSet<E>(name, mapBuilder.build());
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java
deleted file mode 100644
index 566953f..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.Set;
-
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.SetBuilder;
-
-/**
- * Default Set builder.
- *
- * @param <E> type for set elements
- */
-public class DefaultSetBuilder<E> implements SetBuilder<E> {
-
-    private Serializer serializer;
-    private String name;
-    private final Database database;
-    private boolean readOnly;
-
-    public DefaultSetBuilder(Database database) {
-        this.database = checkNotNull(database);
-    }
-
-    @Override
-    public SetBuilder<E> withName(String name) {
-        checkArgument(name != null && !name.isEmpty());
-        this.name = name;
-        return this;
-    }
-
-    @Override
-    public SetBuilder<E> withSerializer(Serializer serializer) {
-        checkArgument(serializer != null);
-        this.serializer = serializer;
-        return this;
-    }
-
-    @Override
-    public SetBuilder<E> withUpdatesDisabled() {
-        readOnly = true;
-        return this;
-    }
-
-    private boolean validInputs() {
-        return name != null && serializer != null;
-    }
-
-    @Override
-    public Set<E> build() {
-        checkState(validInputs());
-        return new DefaultDistributedSet<>(name, database, serializer, readOnly);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index 5570e30..b0ab575 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -18,9 +18,11 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.*;
 
+import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TransactionContext;
@@ -41,10 +43,14 @@
     private boolean isOpen = false;
     private final Database database;
     private final long transactionId;
+    private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
 
-    public DefaultTransactionContext(Database database, long transactionId) {
-        this.database = checkNotNull(database);
+    public DefaultTransactionContext(long transactionId,
+            Database database,
+            Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
         this.transactionId = transactionId;
+        this.database = checkNotNull(database);
+        this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
     }
 
     @Override
@@ -72,7 +78,7 @@
         checkNotNull(serializer);
         return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
                                 name,
-                                new DefaultConsistentMap<>(name, database, serializer, false),
+                                mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
                                 this,
                                 serializer));
     }
@@ -85,6 +91,7 @@
             List<DatabaseUpdate> updates = Lists.newLinkedList();
             txMaps.values()
                   .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
+            // FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
             database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
         } catch (Exception e) {
             abort();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
index a3484cf..16e22aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
@@ -10,14 +10,11 @@
 public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
 
     private boolean partitionsEnabled = true;
-    private final Database partitionedDatabase;
-    private final Database inMemoryDatabase;
+    private final DatabaseManager manager;
     private final long transactionId;
 
-    public DefaultTransactionContextBuilder(
-            Database inMemoryDatabase, Database partitionedDatabase, long transactionId) {
-        this.partitionedDatabase = partitionedDatabase;
-        this.inMemoryDatabase = inMemoryDatabase;
+    public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
+        this.manager = manager;
         this.transactionId = transactionId;
     }
 
@@ -30,8 +27,9 @@
     @Override
     public TransactionContext build() {
         return new DefaultTransactionContext(
-                partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
-                transactionId);
+                transactionId,
+                partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
+                () -> partitionsEnabled ? manager.consistentMapBuilder()
+                                        : manager.consistentMapBuilder().withPartitionsDisabled());
     }
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 2903e0a..7b279dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -50,6 +50,7 @@
     private final List<Database> partitions;
     private final AtomicBoolean isOpen = new AtomicBoolean(false);
     private static final String DB_NOT_OPEN = "Partitioned Database is not open";
+    private TransactionManager transactionManager;
 
     public PartitionedDatabase(
             String name,
@@ -285,7 +286,10 @@
                     subTransactions.entrySet().iterator().next();
             return entry.getKey().prepareAndCommit(entry.getValue());
         } else {
-            return new TransactionManager(this).execute(transaction);
+            if (transactionManager != null) {
+                throw new IllegalStateException("TransactionManager is not initialized");
+            }
+            return transactionManager.execute(transaction);
         }
     }
 
@@ -387,4 +391,8 @@
         perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
         return subTransactions;
     }
+
+    protected void setTransactionManager(TransactionManager tranasactionManager) {
+        this.transactionManager = transactionManager;
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
index d8e9930..245ae1a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -17,6 +17,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -26,6 +27,7 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Transaction;
@@ -49,7 +51,7 @@
             .register(ImmutablePair.class)
             .build();
 
-    private final Serializer serializer = Serializer.using(KRYO_NAMESPACE);
+    private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
     private final Database database;
     private final AsyncConsistentMap<Long, Transaction> transactions;
 
@@ -58,9 +60,11 @@
      *
      * @param database database
      */
-    public TransactionManager(Database database) {
+    public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
         this.database = checkNotNull(database, "database cannot be null");
-        this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer, false);
+        this.transactions = mapBuilder.withName("onos-transactions")
+                                      .withSerializer(serializer)
+                                      .buildAsyncMap();
     }
 
     /**
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 0ceef4b..f930f8c 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -170,6 +170,8 @@
 import org.onosproject.net.resource.link.MplsLabelResourceAllocation;
 import org.onosproject.net.resource.link.MplsLabelResourceRequest;
 import org.onosproject.store.Timestamp;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.SetEvent;
 import org.onosproject.store.service.Versioned;
 
 import java.net.URI;
@@ -408,6 +410,10 @@
             .register(new HostLocationSerializer(), HostLocation.class)
             .register(new DefaultOutboundPacketSerializer(), DefaultOutboundPacket.class)
             .register(Versioned.class)
+            .register(MapEvent.class)
+            .register(MapEvent.Type.class)
+            .register(SetEvent.class)
+            .register(SetEvent.Type.class)
             .register(DefaultGroupId.class)
             .register(Annotations.class)
             .register(OmsPort.class)