Notification support for Consistent datastructures (ConsitentMap and DistributedSet)
Change-Id: If74cdc411c79c42c7643420e6369cf656849bb6a
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 8a88ed6..1779256 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -288,4 +288,19 @@
* @return optional updated value. Will be empty if update did not happen.
*/
CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ */
+ void addListener(MapEventListener<K, V> listener);
+
+ /**
+ * Unregisters the specified listener such that it will no longer
+ * receive map change notifications.
+ *
+ * @param listener listener to unregister
+ */
+ void removeListener(MapEventListener<K, V> listener);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 2893509..59f72a9 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -289,4 +289,19 @@
* @return optional new value. Will be empty if replace did not happen
*/
Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ */
+ void addListener(MapEventListener<K, V> listener);
+
+ /**
+ * Unregisters the specified listener such that it will no longer
+ * receive map change notifications.
+ *
+ * @param listener listener to unregister
+ */
+ void removeListener(MapEventListener<K, V> listener);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
new file mode 100644
index 0000000..4c9116a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
@@ -0,0 +1,26 @@
+package org.onosproject.store.service;
+
+import java.util.Set;
+
+/**
+ * A distributed collection designed for holding unique elements.
+ *
+ * @param <E> set entry type
+ */
+public interface DistributedSet<E> extends Set<E> {
+
+ /**
+ * Registers the specified listener to be notified whenever
+ * the set is updated.
+ *
+ * @param listener listener to notify about set update events
+ */
+ void addListener(SetEventListener<E> listener);
+
+ /**
+ * Unregisters the specified listener.
+ *
+ * @param listener listener to unregister.
+ */
+ void removeListener(SetEventListener<E> listener);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
similarity index 60%
rename from core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
rename to core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
index 3f28065f..c5608ce 100644
--- a/core/api/src/main/java/org/onosproject/store/service/SetBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
@@ -15,14 +15,12 @@
*/
package org.onosproject.store.service;
-import java.util.Set;
-
/**
* Builder for distributed set.
*
* @param <E> type set elements.
*/
-public interface SetBuilder<E> {
+public interface DistributedSetBuilder<E> {
/**
* Sets the name of the set.
@@ -34,9 +32,9 @@
* </p>
*
* @param name name of the set
- * @return this SetBuilder
+ * @return this DistributedSetBuilder
*/
- SetBuilder<E> withName(String name);
+ DistributedSetBuilder<E> withName(String name);
/**
* Sets a serializer that can be used to serialize
@@ -48,18 +46,36 @@
* </p>
*
* @param serializer serializer
- * @return this SetBuilder
+ * @return this DistributedSetBuilder
*/
- SetBuilder<E> withSerializer(Serializer serializer);
+ DistributedSetBuilder<E> withSerializer(Serializer serializer);
/**
* Disables set updates.
* <p>
* Attempt to update the built set will throw {@code UnsupportedOperationException}.
*
- * @return this SetBuilder
+ * @return this DistributedSetBuilder
*/
- SetBuilder<E> withUpdatesDisabled();
+ DistributedSetBuilder<E> withUpdatesDisabled();
+
+ /**
+ * Disables distribution of set entries across multiple database partitions.
+ * <p>
+ * When partitioning is disabled, the returned set will have a single partition
+ * that spans the entire cluster. Furthermore, the changes made to the set are
+ * ephemeral and do not survive a full cluster restart.
+ * </p>
+ * <p>
+ * Disabling partitions is more appropriate when the returned set is used for
+ * simple coordination activities and not for long term data persistence.
+ * </p>
+ * <p>
+ * Note: By default partitions are enabled and entries in the set are durable.
+ * </p>
+ * @return this DistributedSetBuilder
+ */
+ DistributedSetBuilder<E> withPartitionsDisabled();
/**
* Builds an set based on the configuration options
@@ -68,5 +84,5 @@
* @return new set
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
- Set<E> build();
+ DistributedSet<E> build();
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
new file mode 100644
index 0000000..74225ac
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
@@ -0,0 +1,120 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a ConsistentMap update notification.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MapEvent<K, V> {
+
+ /**
+ * MapEvent type.
+ */
+ public enum Type {
+ /**
+ * Entry inserted into the map.
+ */
+ INSERT,
+
+ /**
+ * Existing map entry updated.
+ */
+ UPDATE,
+
+ /**
+ * Entry removed from map.
+ */
+ REMOVE
+ }
+
+ private final String name;
+ private final Type type;
+ private final K key;
+ private final Versioned<V> value;
+
+ /**
+ * Creates a new event object.
+ *
+ * @param name map name
+ * @param type the type of the event
+ * @param key the key the event concerns
+ * @param value the value related to the key, or null for remove events
+ */
+ public MapEvent(String name, Type type, K key, Versioned<V> value) {
+ this.name = name;
+ this.type = type;
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Returns the map name.
+ *
+ * @return name of map
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the type of the event.
+ *
+ * @return the type of the event
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the key this event concerns.
+ *
+ * @return the key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the value associated with this event. If type is REMOVE,
+ * this is the value that was removed. If type is INSERT/UPDATE, this is
+ * the new value.
+ *
+ * @return the value
+ */
+ public Versioned<V> value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MapEvent)) {
+ return false;
+ }
+
+ MapEvent<K, V> that = (MapEvent) o;
+ return Objects.equals(this.name, that.name) &&
+ Objects.equals(this.type, that.type) &&
+ Objects.equals(this.key, that.key) &&
+ Objects.equals(this.value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, key, value);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("name", name)
+ .add("type", type)
+ .add("key", key)
+ .add("value", value)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
new file mode 100644
index 0000000..2856306
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a ConsitentMap.
+ */
+public interface MapEventListener<K, V> {
+ /**
+ * Reacts to the specified event.
+ *
+ * @param event the event
+ */
+ void event(MapEvent<K, V> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/Serializer.java b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
index 5e1b84e..0b00a68 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Serializer.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
@@ -16,8 +16,13 @@
package org.onosproject.store.service;
+import java.util.Arrays;
+import java.util.List;
+
import org.onlab.util.KryoNamespace;
+import com.google.common.collect.Lists;
+
/**
* Interface for serialization for store artifacts.
*/
@@ -45,16 +50,32 @@
* @return Serializer instance
*/
static Serializer using(KryoNamespace kryo) {
+ return using(Arrays.asList(kryo));
+ }
+
+ static Serializer using(List<KryoNamespace> namespaces, Class<?>... classes) {
+ KryoNamespace.Builder builder = new KryoNamespace.Builder();
+ namespaces.forEach(builder::register);
+ Lists.newArrayList(classes).forEach(builder::register);
+ builder.register(MapEvent.class, MapEvent.Type.class);
+ KryoNamespace namespace = builder.build();
return new Serializer() {
@Override
public <T> byte[] encode(T object) {
- return kryo.serialize(object);
+ return namespace.serialize(object);
}
@Override
public <T> T decode(byte[] bytes) {
- return kryo.deserialize(bytes);
+ return namespace.deserialize(bytes);
}
};
}
+
+ static Serializer forTypes(Class<?>... classes) {
+ return using(KryoNamespace.newBuilder()
+ .register(classes)
+ .register(MapEvent.class, MapEvent.Type.class)
+ .build());
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEvent.java b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
new file mode 100644
index 0000000..1296a8b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEvent.java
@@ -0,0 +1,98 @@
+package org.onosproject.store.service;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of a DistributedSet update notification.
+ *
+ * @param <E> element type
+ */
+public class SetEvent<E> {
+
+ /**
+ * SetEvent type.
+ */
+ public enum Type {
+ /**
+ * Entry added to the set.
+ */
+ ADD,
+
+ /**
+ * Entry removed from the set.
+ */
+ REMOVE
+ }
+
+ private final String name;
+ private final Type type;
+ private final E entry;
+
+ /**
+ * Creates a new event object.
+ *
+ * @param name set name
+ * @param type the type of the event
+ * @param entry the entry the event concerns
+ */
+ public SetEvent(String name, Type type, E entry) {
+ this.name = name;
+ this.type = type;
+ this.entry = entry;
+ }
+
+ /**
+ * Returns the set name.
+ *
+ * @return name of set
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the type of the event.
+ *
+ * @return the type of the event
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the entry this event concerns.
+ *
+ * @return the entry
+ */
+ public E entry() {
+ return entry;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SetEvent)) {
+ return false;
+ }
+
+ SetEvent<E> that = (SetEvent) o;
+ return Objects.equals(this.name, that.name) &&
+ Objects.equals(this.type, that.type) &&
+ Objects.equals(this.entry, that.entry);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, entry);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("name", name)
+ .add("type", type)
+ .add("entry", entry)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
new file mode 100644
index 0000000..c9ffea1
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/SetEventListener.java
@@ -0,0 +1,13 @@
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a DistributedSet.
+ */
+public interface SetEventListener<E> {
+ /**
+ * Reacts to the specified event.
+ *
+ * @param event the event
+ */
+ void event(SetEvent<E> event);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index b836ce8..55a07ed 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -53,7 +53,7 @@
* @param <E> set element type
* @return builder for an distributed set
*/
- <E> SetBuilder<E> setBuilder();
+ <E> DistributedSetBuilder<E> setBuilder();
/**
* Creates a new AtomicCounterBuilder.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 47031b0..a222627 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -42,19 +42,24 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+
+import static org.onlab.util.Tools.groupedThreads;
+
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapInfo;
import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.store.service.SetBuilder;
+import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Transaction;
@@ -69,6 +74,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -93,12 +99,16 @@
private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
private ClusterCoordinator coordinator;
- private PartitionedDatabase partitionedDatabase;
- private Database inMemoryDatabase;
+ protected PartitionedDatabase partitionedDatabase;
+ protected Database inMemoryDatabase;
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
+ private ExecutorService eventDispatcher;
+
+ private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -187,7 +197,10 @@
Futures.getUnchecked(status);
- transactionManager = new TransactionManager(partitionedDatabase);
+ transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
+
+ eventDispatcher = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/manager", "map-event-dispatcher"));
log.info("Started");
}
@@ -213,13 +226,14 @@
log.info("Successfully closed databases.");
}
});
+ maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
+ eventDispatcher.shutdown();
log.info("Stopped");
}
@Override
public TransactionContextBuilder transactionContextBuilder() {
- return new DefaultTransactionContextBuilder(
- inMemoryDatabase, partitionedDatabase, transactionIdGenerator.getNewId());
+ return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
}
@Override
@@ -296,12 +310,12 @@
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
- return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase);
+ return new DefaultConsistentMapBuilder<>(this);
}
@Override
- public <E> SetBuilder<E> setBuilder() {
- return new DefaultSetBuilder<>(partitionedDatabase);
+ public <E> DistributedSetBuilder<E> setBuilder() {
+ return new DefaultDistributedSetBuilder<>(this);
}
@Override
@@ -370,4 +384,20 @@
public void redriveTransactions() {
getTransactions().stream().forEach(transactionManager::execute);
}
+
+ protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
+ // TODO: Support different local instances of the same map.
+ if (!maps.add(map)) {
+ throw new IllegalStateException("Map by name " + map.name() + " already exists");
+ }
+
+ clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
+ map.serializer()::decode,
+ map::notifyLocalListeners,
+ eventDispatcher);
+ }
+
+ protected static MessageSubject mapUpdatesSubject(String mapName) {
+ return new MessageSubject(mapName + "-map-updates");
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 3fb6f4a..4bc93bc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -17,6 +17,7 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.*;
+import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Map;
@@ -24,8 +25,10 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -36,8 +39,11 @@
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -56,6 +62,11 @@
private final Database database;
private final Serializer serializer;
private final boolean readOnly;
+ private final Consumer<MapEvent<K, V>> eventPublisher;
+
+ private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
+
+ private final Logger log = getLogger(getClass());
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
@@ -77,11 +88,29 @@
public DefaultAsyncConsistentMap(String name,
Database database,
Serializer serializer,
- boolean readOnly) {
+ boolean readOnly,
+ Consumer<MapEvent<K, V>> eventPublisher) {
this.name = checkNotNull(name, "map name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.readOnly = readOnly;
+ this.eventPublisher = eventPublisher;
+ }
+
+ /**
+ * Returns this map name.
+ * @return map name
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the serializer for map entries.
+ * @return map entry serializer
+ */
+ public Serializer serializer() {
+ return serializer;
}
@Override
@@ -139,6 +168,7 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(condition, "predicate function cannot be null");
checkNotNull(remappingFunction, "Remapping function cannot be null");
+ AtomicReference<MapEvent<K, V>> mapEvent = new AtomicReference<>();
return get(key).thenCompose(r1 -> {
V existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
@@ -160,6 +190,7 @@
if (r1 != null) {
return remove(key, r1.version()).thenApply(result -> {
if (result) {
+ mapEvent.set(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r1));
return null;
} else {
throw new ConsistentMapException.ConcurrentModification();
@@ -174,6 +205,7 @@
return replaceAndGet(key, r1.version(), computedValue.get())
.thenApply(v -> {
if (v.isPresent()) {
+ mapEvent.set(new MapEvent<>(name, MapEvent.Type.UPDATE, key, v.get()));
return v.get();
} else {
throw new ConsistentMapException.ConcurrentModification();
@@ -184,12 +216,13 @@
if (!result.isPresent()) {
throw new ConsistentMapException.ConcurrentModification();
} else {
+ mapEvent.set(new MapEvent<>(name, MapEvent.Type.INSERT, key, result.get()));
return result.get();
}
});
}
}
- });
+ }).whenComplete((result, error) -> notifyListeners(mapEvent.get()));
}
@Override
@@ -370,4 +403,35 @@
throw new UnsupportedOperationException();
}
}
+
+ @Override
+ public void addListener(MapEventListener<K, V> listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(MapEventListener<K, V> listener) {
+ listeners.remove(listener);
+ }
+
+ protected void notifyListeners(MapEvent<K, V> event) {
+ try {
+ if (event != null) {
+ notifyLocalListeners(event);
+ notifyRemoteListeners(event);
+ }
+ } catch (Exception e) {
+ log.warn("Failure notifying listeners about {}", event, e);
+ }
+ }
+
+ protected void notifyLocalListeners(MapEvent<K, V> event) {
+ listeners.forEach(listener -> listener.event(event));
+ }
+
+ protected void notifyRemoteListeners(MapEvent<K, V> event) {
+ if (eventPublisher != null) {
+ eventPublisher.accept(event);
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index b85dfa2..7995d8f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -28,10 +28,9 @@
import java.util.function.Predicate;
import java.util.Set;
-import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
/**
@@ -45,13 +44,14 @@
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
- private final AsyncConsistentMap<K, V> asyncMap;
+ private final DefaultAsyncConsistentMap<K, V> asyncMap;
- public DefaultConsistentMap(String name,
- Database database,
- Serializer serializer,
- boolean readOnly) {
- asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer, readOnly);
+ public String name() {
+ return asyncMap.name();
+ }
+
+ public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
+ this.asyncMap = asyncMap;
}
@Override
@@ -190,4 +190,14 @@
}
}
}
+
+ @Override
+ public void addListener(MapEventListener<K, V> listener) {
+ asyncMap.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(MapEventListener<K, V> listener) {
+ asyncMap.addListener(listener);
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
index 534b59f..24db74f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
@@ -6,6 +6,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
/**
@@ -20,12 +21,10 @@
private String name;
private boolean partitionsEnabled = true;
private boolean readOnly = false;
- private final Database partitionedDatabase;
- private final Database inMemoryDatabase;
+ private final DatabaseManager manager;
- public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
- this.inMemoryDatabase = inMemoryDatabase;
- this.partitionedDatabase = partitionedDatabase;
+ public DefaultConsistentMapBuilder(DatabaseManager manager) {
+ this.manager = manager;
}
@Override
@@ -60,21 +59,25 @@
@Override
public ConsistentMap<K, V> build() {
- checkState(validInputs());
- return new DefaultConsistentMap<>(
- name,
- partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
- serializer,
- readOnly);
+ return new DefaultConsistentMap<>(buildAndRegisterMap());
}
@Override
public AsyncConsistentMap<K, V> buildAsyncMap() {
+ return buildAndRegisterMap();
+ }
+
+ private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
checkState(validInputs());
- return new DefaultAsyncConsistentMap<>(
+ DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
name,
- partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
+ partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
serializer,
- readOnly);
+ readOnly,
+ event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
+ DatabaseManager.mapUpdatesSubject(name),
+ serializer::encode));
+ manager.registerMap(asyncMap);
+ return asyncMap;
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index adc9dcd..ba5ff95 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -17,11 +17,17 @@
import java.util.Collection;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -29,12 +35,15 @@
* @param <E> set element type
*/
-public class DefaultDistributedSet<E> implements Set<E> {
+public class DefaultDistributedSet<E> implements DistributedSet<E> {
+ private final String name;
private final ConsistentMap<E, Boolean> backingMap;
+ private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
- public DefaultDistributedSet(String name, Database database, Serializer serializer, boolean readOnly) {
- backingMap = new DefaultConsistentMap<>(name, database, serializer, readOnly);
+ public DefaultDistributedSet(String name, ConsistentMap<E, Boolean> backingMap) {
+ this.name = name;
+ this.backingMap = backingMap;
}
@Override
@@ -76,7 +85,7 @@
@SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
- return backingMap.remove((E) o, true);
+ return backingMap.remove((E) o) != null;
}
@Override
@@ -119,4 +128,26 @@
public void clear() {
backingMap.clear();
}
+
+ @Override
+ public void addListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
+ if (mapEvent.type() == MapEvent.Type.INSERT) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
+ } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
+ }
+ };
+ if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
+ backingMap.addListener(mapEventListener);
+ }
+ }
+
+ @Override
+ public void removeListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
+ if (mapEventListener != null) {
+ backingMap.removeListener(mapEventListener);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
new file mode 100644
index 0000000..57ec232
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DistributedSetBuilder;
+
+/**
+ * Default distributed set builder.
+ *
+ * @param <E> type for set elements
+ */
+public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> {
+
+ private String name;
+ private ConsistentMapBuilder<E, Boolean> mapBuilder;
+
+ public DefaultDistributedSetBuilder(DatabaseManager manager) {
+ this.mapBuilder = manager.consistentMapBuilder();
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withName(String name) {
+ mapBuilder.withName(name);
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
+ mapBuilder.withSerializer(serializer);
+ return this;
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withUpdatesDisabled() {
+ mapBuilder.withUpdatesDisabled();
+ return this;
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withPartitionsDisabled() {
+ mapBuilder.withPartitionsDisabled();
+ return this;
+ }
+
+ @Override
+ public DistributedSet<E> build() {
+ return new DefaultDistributedSet<E>(name, mapBuilder.build());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java
deleted file mode 100644
index 566953f..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.Set;
-
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.SetBuilder;
-
-/**
- * Default Set builder.
- *
- * @param <E> type for set elements
- */
-public class DefaultSetBuilder<E> implements SetBuilder<E> {
-
- private Serializer serializer;
- private String name;
- private final Database database;
- private boolean readOnly;
-
- public DefaultSetBuilder(Database database) {
- this.database = checkNotNull(database);
- }
-
- @Override
- public SetBuilder<E> withName(String name) {
- checkArgument(name != null && !name.isEmpty());
- this.name = name;
- return this;
- }
-
- @Override
- public SetBuilder<E> withSerializer(Serializer serializer) {
- checkArgument(serializer != null);
- this.serializer = serializer;
- return this;
- }
-
- @Override
- public SetBuilder<E> withUpdatesDisabled() {
- readOnly = true;
- return this;
- }
-
- private boolean validInputs() {
- return name != null && serializer != null;
- }
-
- @Override
- public Set<E> build() {
- checkState(validInputs());
- return new DefaultDistributedSet<>(name, database, serializer, readOnly);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index 5570e30..b0ab575 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -18,9 +18,11 @@
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import static com.google.common.base.Preconditions.*;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
@@ -41,10 +43,14 @@
private boolean isOpen = false;
private final Database database;
private final long transactionId;
+ private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
- public DefaultTransactionContext(Database database, long transactionId) {
- this.database = checkNotNull(database);
+ public DefaultTransactionContext(long transactionId,
+ Database database,
+ Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
this.transactionId = transactionId;
+ this.database = checkNotNull(database);
+ this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
}
@Override
@@ -72,7 +78,7 @@
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
name,
- new DefaultConsistentMap<>(name, database, serializer, false),
+ mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
this,
serializer));
}
@@ -85,6 +91,7 @@
List<DatabaseUpdate> updates = Lists.newLinkedList();
txMaps.values()
.forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
+ // FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
} catch (Exception e) {
abort();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
index a3484cf..16e22aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
@@ -10,14 +10,11 @@
public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
private boolean partitionsEnabled = true;
- private final Database partitionedDatabase;
- private final Database inMemoryDatabase;
+ private final DatabaseManager manager;
private final long transactionId;
- public DefaultTransactionContextBuilder(
- Database inMemoryDatabase, Database partitionedDatabase, long transactionId) {
- this.partitionedDatabase = partitionedDatabase;
- this.inMemoryDatabase = inMemoryDatabase;
+ public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
+ this.manager = manager;
this.transactionId = transactionId;
}
@@ -30,8 +27,9 @@
@Override
public TransactionContext build() {
return new DefaultTransactionContext(
- partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
- transactionId);
+ transactionId,
+ partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
+ () -> partitionsEnabled ? manager.consistentMapBuilder()
+ : manager.consistentMapBuilder().withPartitionsDisabled());
}
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 2903e0a..7b279dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -50,6 +50,7 @@
private final List<Database> partitions;
private final AtomicBoolean isOpen = new AtomicBoolean(false);
private static final String DB_NOT_OPEN = "Partitioned Database is not open";
+ private TransactionManager transactionManager;
public PartitionedDatabase(
String name,
@@ -285,7 +286,10 @@
subTransactions.entrySet().iterator().next();
return entry.getKey().prepareAndCommit(entry.getValue());
} else {
- return new TransactionManager(this).execute(transaction);
+ if (transactionManager != null) {
+ throw new IllegalStateException("TransactionManager is not initialized");
+ }
+ return transactionManager.execute(transaction);
}
}
@@ -387,4 +391,8 @@
perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
return subTransactions;
}
+
+ protected void setTransactionManager(TransactionManager tranasactionManager) {
+ this.transactionManager = transactionManager;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
index d8e9930..245ae1a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -17,6 +17,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -26,6 +27,7 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
@@ -49,7 +51,7 @@
.register(ImmutablePair.class)
.build();
- private final Serializer serializer = Serializer.using(KRYO_NAMESPACE);
+ private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
private final Database database;
private final AsyncConsistentMap<Long, Transaction> transactions;
@@ -58,9 +60,11 @@
*
* @param database database
*/
- public TransactionManager(Database database) {
+ public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
this.database = checkNotNull(database, "database cannot be null");
- this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer, false);
+ this.transactions = mapBuilder.withName("onos-transactions")
+ .withSerializer(serializer)
+ .buildAsyncMap();
}
/**
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 0ceef4b..f930f8c 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -170,6 +170,8 @@
import org.onosproject.net.resource.link.MplsLabelResourceAllocation;
import org.onosproject.net.resource.link.MplsLabelResourceRequest;
import org.onosproject.store.Timestamp;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Versioned;
import java.net.URI;
@@ -408,6 +410,10 @@
.register(new HostLocationSerializer(), HostLocation.class)
.register(new DefaultOutboundPacketSerializer(), DefaultOutboundPacket.class)
.register(Versioned.class)
+ .register(MapEvent.class)
+ .register(MapEvent.Type.class)
+ .register(SetEvent.class)
+ .register(SetEvent.Type.class)
.register(DefaultGroupId.class)
.register(Annotations.class)
.register(OmsPort.class)