Notification support for Consistent datastructures (ConsitentMap and DistributedSet)

Change-Id: If74cdc411c79c42c7643420e6369cf656849bb6a
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 8a88ed6..1779256 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -288,4 +288,19 @@
      * @return optional updated value. Will be empty if update did not happen.
      */
     CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
+
+    /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     */
+    void addListener(MapEventListener<K, V> listener);
+
+    /**
+     * Unregisters the specified listener such that it will no longer
+     * receive map change notifications.
+     *
+     * @param listener listener to unregister
+     */
+    void removeListener(MapEventListener<K, V> listener);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 2893509..59f72a9 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -289,4 +289,19 @@
      * @return optional new value. Will be empty if replace did not happen
      */
     Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
+
+    /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     */
+    void addListener(MapEventListener<K, V> listener);
+
+    /**
+     * Unregisters the specified listener such that it will no longer
+     * receive map change notifications.
+     *
+     * @param listener listener to unregister
+     */
+    void removeListener(MapEventListener<K, V> listener);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
new file mode 100644
index 0000000..4c9116a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
@@ -0,0 +1,26 @@
+package org.onosproject.store.service;
+
+import java.util.Set;
+
+/**
+ * A distributed collection designed for holding unique elements.
+ *
+ * @param <E> set entry type
+ */
+public interface DistributedSet<E> extends Set<E> {
+
+    /**
+     * Registers the specified listener to be notified whenever
+     * the set is updated.
+     *
+     * @param listener listener to notify about set update events
+     */
+    void addListener(SetEventListener<E> listener);
+
+    /**
+     * Unregisters the specified listener.
+     *
+     * @param listener listener to unregister.
+     */
+    void removeListener(SetEventListener<E> listener);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
similarity index 60%
rename from core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
rename to core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
index 3f28065f..c5608ce 100644
--- a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
@@ -15,14 +15,12 @@
  */
 package org.onosproject.store.service;
 
-import java.util.Set;
-
 /**
  * Builder for distributed set.
  *
  * @param <E> type set elements.
  */
-public interface SetBuilder<E> {
+public interface DistributedSetBuilder<E> {
 
     /**
      * Sets the name of the set.
@@ -34,9 +32,9 @@
      * </p>
      *
      * @param name name of the set
-     * @return this SetBuilder
+     * @return this DistributedSetBuilder
      */
-    SetBuilder<E> withName(String name);
+    DistributedSetBuilder<E> withName(String name);
 
     /**
      * Sets a serializer that can be used to serialize
@@ -48,18 +46,36 @@
      * </p>
      *
      * @param serializer serializer
-     * @return this SetBuilder
+     * @return this DistributedSetBuilder
      */
-    SetBuilder<E> withSerializer(Serializer serializer);
+    DistributedSetBuilder<E> withSerializer(Serializer serializer);
 
     /**
      * Disables set updates.
      * <p>
      * Attempt to update the built set will throw {@code UnsupportedOperationException}.
      *
-     * @return this SetBuilder
+     * @return this DistributedSetBuilder
      */
-    SetBuilder<E> withUpdatesDisabled();
+    DistributedSetBuilder<E> withUpdatesDisabled();
+
+    /**
+     * Disables distribution of set entries across multiple database partitions.
+     * <p>
+     * When partitioning is disabled, the returned set will have a single partition
+     * that spans the entire cluster. Furthermore, the changes made to the set are
+     * ephemeral and do not survive a full cluster restart.
+     * </p>
+     * <p>
+     * Disabling partitions is more appropriate when the returned set is used for
+     * simple coordination activities and not for long term data persistence.
+     * </p>
+     * <p>
+     * Note: By default partitions are enabled and entries in the set are durable.
+     * </p>
+     * @return this DistributedSetBuilder
+     */
+    DistributedSetBuilder<E> withPartitionsDisabled();
 
     /**
      * Builds an set based on the configuration options
@@ -68,5 +84,5 @@
      * @return new set
      * @throws java.lang.RuntimeException if a mandatory parameter is missing
      */
-    Set<E> build();
+    DistributedSet<E> build();
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
new file mode 100644
index 0000000..74225ac
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
@@ -0,0 +1,120 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a ConsistentMap update notification.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MapEvent<K, V> {
+
+    /**
+     * MapEvent type.
+     */
+    public enum Type {
+        /**
+         * Entry inserted into the map.
+         */
+        INSERT,
+
+        /**
+         * Existing map entry updated.
+         */
+        UPDATE,
+
+        /**
+         * Entry removed from map.
+         */
+        REMOVE
+    }
+
+    private final String name;
+    private final Type type;
+    private final K key;
+    private final Versioned<V> value;
+
+    /**
+     * Creates a new event object.
+     *
+     * @param name map name
+     * @param type the type of the event
+     * @param key the key the event concerns
+     * @param value the value related to the key, or null for remove events
+     */
+    public MapEvent(String name, Type type, K key, Versioned<V> value) {
+        this.name = name;
+        this.type = type;
+        this.key = key;
+        this.value = value;
+    }
+
+    /**
+     * Returns the map name.
+     *
+     * @return name of map
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the type of the event.
+     *
+     * @return the type of the event
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the key this event concerns.
+     *
+     * @return the key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the value associated with this event. If type is REMOVE,
+     * this is the value that was removed. If type is INSERT/UPDATE, this is
+     * the new value.
+     *
+     * @return the value
+     */
+    public Versioned<V> value() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof MapEvent)) {
+            return false;
+        }
+
+        MapEvent<K, V> that = (MapEvent) o;
+        return Objects.equals(this.name, that.name) &&
+                Objects.equals(this.type, that.type) &&
+                Objects.equals(this.key, that.key) &&
+                Objects.equals(this.value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, key, value);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("name", name)
+                .add("type", type)
+                .add("key", key)
+                .add("value", value)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
new file mode 100644
index 0000000..2856306
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a ConsitentMap.
+ */
+public interface MapEventListener<K, V> {
+    /**
+     * Reacts to the specified event.
+     *
+     * @param event the event
+     */
+    void event(MapEvent<K, V> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Serializer.java b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
index 5e1b84e..0b00a68 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Serializer.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
@@ -16,8 +16,13 @@
 
 package org.onosproject.store.service;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.onlab.util.KryoNamespace;
 
+import com.google.common.collect.Lists;
+
 /**
  * Interface for serialization for store artifacts.
  */
@@ -45,16 +50,32 @@
      * @return Serializer instance
      */
     static Serializer using(KryoNamespace kryo) {
+        return using(Arrays.asList(kryo));
+    }
+
+    static Serializer using(List<KryoNamespace> namespaces, Class<?>... classes) {
+        KryoNamespace.Builder builder = new KryoNamespace.Builder();
+        namespaces.forEach(builder::register);
+        Lists.newArrayList(classes).forEach(builder::register);
+        builder.register(MapEvent.class, MapEvent.Type.class);
+        KryoNamespace namespace = builder.build();
         return new Serializer() {
             @Override
             public <T> byte[] encode(T object) {
-                return kryo.serialize(object);
+                return namespace.serialize(object);
             }
 
             @Override
             public <T> T decode(byte[] bytes) {
-                return kryo.deserialize(bytes);
+                return namespace.deserialize(bytes);
             }
         };
     }
+
+    static Serializer forTypes(Class<?>... classes) {
+        return using(KryoNamespace.newBuilder()
+                                  .register(classes)
+                                  .register(MapEvent.class, MapEvent.Type.class)
+                                  .build());
+    }
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEvent.java b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
new file mode 100644
index 0000000..1296a8b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
@@ -0,0 +1,98 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a DistributedSet update notification.
+ *
+ * @param <E> element type
+ */
+public class SetEvent<E> {
+
+    /**
+     * SetEvent type.
+     */
+    public enum Type {
+        /**
+         * Entry added to the set.
+         */
+        ADD,
+
+        /**
+         * Entry removed from the set.
+         */
+        REMOVE
+    }
+
+    private final String name;
+    private final Type type;
+    private final E entry;
+
+    /**
+     * Creates a new event object.
+     *
+     * @param name set name
+     * @param type the type of the event
+     * @param entry the entry the event concerns
+     */
+    public SetEvent(String name, Type type, E entry) {
+        this.name = name;
+        this.type = type;
+        this.entry = entry;
+    }
+
+    /**
+     * Returns the set name.
+     *
+     * @return name of set
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the type of the event.
+     *
+     * @return the type of the event
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the entry this event concerns.
+     *
+     * @return the entry
+     */
+    public E entry() {
+        return entry;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof SetEvent)) {
+            return false;
+        }
+
+        SetEvent<E> that = (SetEvent) o;
+        return Objects.equals(this.name, that.name) &&
+                Objects.equals(this.type, that.type) &&
+                Objects.equals(this.entry, that.entry);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, entry);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("name", name)
+                .add("type", type)
+                .add("entry", entry)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
new file mode 100644
index 0000000..c9ffea1
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a DistributedSet.
+ */
+public interface SetEventListener<E> {
+    /**
+     * Reacts to the specified event.
+     *
+     * @param event the event
+     */
+    void event(SetEvent<E> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index b836ce8..55a07ed 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -53,7 +53,7 @@
      * @param <E> set element type
      * @return builder for an distributed set
      */
-    <E> SetBuilder<E> setBuilder();
+    <E> DistributedSetBuilder<E> setBuilder();
 
     /**
      * Creates a new AtomicCounterBuilder.