Notification support for Consistent datastructures (ConsitentMap and DistributedSet)
Change-Id: If74cdc411c79c42c7643420e6369cf656849bb6a
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 8a88ed6..1779256 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -288,4 +288,19 @@
* @return optional updated value. Will be empty if update did not happen.
*/
CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ */
+ void addListener(MapEventListener<K, V> listener);
+
+ /**
+ * Unregisters the specified listener such that it will no longer
+ * receive map change notifications.
+ *
+ * @param listener listener to unregister
+ */
+ void removeListener(MapEventListener<K, V> listener);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 2893509..59f72a9 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -289,4 +289,19 @@
* @return optional new value. Will be empty if replace did not happen
*/
Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ */
+ void addListener(MapEventListener<K, V> listener);
+
+ /**
+ * Unregisters the specified listener such that it will no longer
+ * receive map change notifications.
+ *
+ * @param listener listener to unregister
+ */
+ void removeListener(MapEventListener<K, V> listener);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
new file mode 100644
index 0000000..4c9116a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
@@ -0,0 +1,26 @@
+package org.onosproject.store.service;
+
+import java.util.Set;
+
+/**
+ * A distributed collection designed for holding unique elements.
+ *
+ * @param <E> set entry type
+ */
+public interface DistributedSet<E> extends Set<E> {
+
+ /**
+ * Registers the specified listener to be notified whenever
+ * the set is updated.
+ *
+ * @param listener listener to notify about set update events
+ */
+ void addListener(SetEventListener<E> listener);
+
+ /**
+ * Unregisters the specified listener.
+ *
+ * @param listener listener to unregister.
+ */
+ void removeListener(SetEventListener<E> listener);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
similarity index 60%
rename from core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
rename to core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
index 3f28065f..c5608ce 100644
--- a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
@@ -15,14 +15,12 @@
*/
package org.onosproject.store.service;
-import java.util.Set;
-
/**
* Builder for distributed set.
*
* @param <E> type set elements.
*/
-public interface SetBuilder<E> {
+public interface DistributedSetBuilder<E> {
/**
* Sets the name of the set.
@@ -34,9 +32,9 @@
* </p>
*
* @param name name of the set
- * @return this SetBuilder
+ * @return this DistributedSetBuilder
*/
- SetBuilder<E> withName(String name);
+ DistributedSetBuilder<E> withName(String name);
/**
* Sets a serializer that can be used to serialize
@@ -48,18 +46,36 @@
* </p>
*
* @param serializer serializer
- * @return this SetBuilder
+ * @return this DistributedSetBuilder
*/
- SetBuilder<E> withSerializer(Serializer serializer);
+ DistributedSetBuilder<E> withSerializer(Serializer serializer);
/**
* Disables set updates.
* <p>
* Attempt to update the built set will throw {@code UnsupportedOperationException}.
*
- * @return this SetBuilder
+ * @return this DistributedSetBuilder
*/
- SetBuilder<E> withUpdatesDisabled();
+ DistributedSetBuilder<E> withUpdatesDisabled();
+
+ /**
+ * Disables distribution of set entries across multiple database partitions.
+ * <p>
+ * When partitioning is disabled, the returned set will have a single partition
+ * that spans the entire cluster. Furthermore, the changes made to the set are
+ * ephemeral and do not survive a full cluster restart.
+ * </p>
+ * <p>
+ * Disabling partitions is more appropriate when the returned set is used for
+ * simple coordination activities and not for long term data persistence.
+ * </p>
+ * <p>
+ * Note: By default partitions are enabled and entries in the set are durable.
+ * </p>
+ * @return this DistributedSetBuilder
+ */
+ DistributedSetBuilder<E> withPartitionsDisabled();
/**
* Builds an set based on the configuration options
@@ -68,5 +84,5 @@
* @return new set
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
- Set<E> build();
+ DistributedSet<E> build();
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
new file mode 100644
index 0000000..74225ac
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
@@ -0,0 +1,120 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a ConsistentMap update notification.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MapEvent<K, V> {
+
+ /**
+ * MapEvent type.
+ */
+ public enum Type {
+ /**
+ * Entry inserted into the map.
+ */
+ INSERT,
+
+ /**
+ * Existing map entry updated.
+ */
+ UPDATE,
+
+ /**
+ * Entry removed from map.
+ */
+ REMOVE
+ }
+
+ private final String name;
+ private final Type type;
+ private final K key;
+ private final Versioned<V> value;
+
+ /**
+ * Creates a new event object.
+ *
+ * @param name map name
+ * @param type the type of the event
+ * @param key the key the event concerns
+ * @param value the value related to the key, or null for remove events
+ */
+ public MapEvent(String name, Type type, K key, Versioned<V> value) {
+ this.name = name;
+ this.type = type;
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Returns the map name.
+ *
+ * @return name of map
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the type of the event.
+ *
+ * @return the type of the event
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the key this event concerns.
+ *
+ * @return the key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the value associated with this event. If type is REMOVE,
+ * this is the value that was removed. If type is INSERT/UPDATE, this is
+ * the new value.
+ *
+ * @return the value
+ */
+ public Versioned<V> value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MapEvent)) {
+ return false;
+ }
+
+ MapEvent<K, V> that = (MapEvent) o;
+ return Objects.equals(this.name, that.name) &&
+ Objects.equals(this.type, that.type) &&
+ Objects.equals(this.key, that.key) &&
+ Objects.equals(this.value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, key, value);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("name", name)
+ .add("type", type)
+ .add("key", key)
+ .add("value", value)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
new file mode 100644
index 0000000..2856306
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a ConsitentMap.
+ */
+public interface MapEventListener<K, V> {
+ /**
+ * Reacts to the specified event.
+ *
+ * @param event the event
+ */
+ void event(MapEvent<K, V> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Serializer.java b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
index 5e1b84e..0b00a68 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Serializer.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
@@ -16,8 +16,13 @@
package org.onosproject.store.service;
+import java.util.Arrays;
+import java.util.List;
+
import org.onlab.util.KryoNamespace;
+import com.google.common.collect.Lists;
+
/**
* Interface for serialization for store artifacts.
*/
@@ -45,16 +50,32 @@
* @return Serializer instance
*/
static Serializer using(KryoNamespace kryo) {
+ return using(Arrays.asList(kryo));
+ }
+
+ static Serializer using(List<KryoNamespace> namespaces, Class<?>... classes) {
+ KryoNamespace.Builder builder = new KryoNamespace.Builder();
+ namespaces.forEach(builder::register);
+ Lists.newArrayList(classes).forEach(builder::register);
+ builder.register(MapEvent.class, MapEvent.Type.class);
+ KryoNamespace namespace = builder.build();
return new Serializer() {
@Override
public <T> byte[] encode(T object) {
- return kryo.serialize(object);
+ return namespace.serialize(object);
}
@Override
public <T> T decode(byte[] bytes) {
- return kryo.deserialize(bytes);
+ return namespace.deserialize(bytes);
}
};
}
+
+ static Serializer forTypes(Class<?>... classes) {
+ return using(KryoNamespace.newBuilder()
+ .register(classes)
+ .register(MapEvent.class, MapEvent.Type.class)
+ .build());
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEvent.java b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
new file mode 100644
index 0000000..1296a8b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
@@ -0,0 +1,98 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a DistributedSet update notification.
+ *
+ * @param <E> element type
+ */
+public class SetEvent<E> {
+
+ /**
+ * SetEvent type.
+ */
+ public enum Type {
+ /**
+ * Entry added to the set.
+ */
+ ADD,
+
+ /**
+ * Entry removed from the set.
+ */
+ REMOVE
+ }
+
+ private final String name;
+ private final Type type;
+ private final E entry;
+
+ /**
+ * Creates a new event object.
+ *
+ * @param name set name
+ * @param type the type of the event
+ * @param entry the entry the event concerns
+ */
+ public SetEvent(String name, Type type, E entry) {
+ this.name = name;
+ this.type = type;
+ this.entry = entry;
+ }
+
+ /**
+ * Returns the set name.
+ *
+ * @return name of set
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the type of the event.
+ *
+ * @return the type of the event
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the entry this event concerns.
+ *
+ * @return the entry
+ */
+ public E entry() {
+ return entry;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SetEvent)) {
+ return false;
+ }
+
+ SetEvent<E> that = (SetEvent) o;
+ return Objects.equals(this.name, that.name) &&
+ Objects.equals(this.type, that.type) &&
+ Objects.equals(this.entry, that.entry);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, entry);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("name", name)
+ .add("type", type)
+ .add("entry", entry)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
new file mode 100644
index 0000000..c9ffea1
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a DistributedSet.
+ */
+public interface SetEventListener<E> {
+ /**
+ * Reacts to the specified event.
+ *
+ * @param event the event
+ */
+ void event(SetEvent<E> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index b836ce8..55a07ed 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -53,7 +53,7 @@
* @param <E> set element type
* @return builder for an distributed set
*/
- <E> SetBuilder<E> setBuilder();
+ <E> DistributedSetBuilder<E> setBuilder();
/**
* Creates a new AtomicCounterBuilder.