Implement listeners for ConsistentMultimap.
Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index d676ab1..2c4ee73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -51,6 +51,7 @@
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
@@ -100,6 +101,8 @@
serializer.register(TransactionLog.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
+ serializer.register(MultimapEvent.class, factory);
+ serializer.register(MultimapEvent.Type.class, factory);
serializer.register(Task.class, factory);
serializer.register(WorkQueueStats.class, factory);
serializer.register(DocumentPath.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 5bd37c5..3db4dce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,14 +16,17 @@
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
+import java.util.concurrent.Executor;
/**
* {@code AsyncConsistentMultimap} that merely delegates control to
@@ -133,6 +136,16 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return delegateMap.addListener(listener, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return delegateMap.removeListener(listener);
+ }
+
+ @Override
public CompletableFuture<Map<K, Collection<V>>> asMap() {
return delegateMap.asMap();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
index 6ec0a69..2045586 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -15,16 +15,17 @@
*/
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
-
/**
* {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
* {@link Executor}.
@@ -125,6 +126,16 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return asyncFuture(delegateMap.addListener(listener, executor));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return asyncFuture(delegateMap.removeListener(listener));
+ }
+
+ @Override
public CompletableFuture<Map<K, Collection<V>>> asMap() {
return asyncFuture(delegateMap.asMap());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index c2f167a..52cf210 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -16,6 +16,17 @@
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
@@ -28,18 +39,6 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import org.onlab.util.Tools;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Maps;
-
/**
* An {@code AsyncConsistentMap} that maps its operations to operations on a
* differently typed {@code AsyncConsistentMap} by transcoding operation inputs and outputs.
@@ -259,11 +258,13 @@
@Override
public CompletableFuture<Void> removeListener(MapEventListener<K1, V1> listener) {
- InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
- if (backingMapListener != null) {
- return backingMap.removeListener(backingMapListener);
- } else {
- return CompletableFuture.completedFuture(null);
+ synchronized (listeners) {
+ InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
+ if (backingMapListener != null) {
+ return backingMap.removeListener(backingMapListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 666ac1f..9d699d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -21,6 +21,8 @@
import com.google.common.collect.Multiset;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
@@ -28,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
@@ -60,6 +63,8 @@
Versioned<Collection<? extends V1>>> versionedValueCollectionDecode;
private final Function<Collection<? extends V1>, Collection<V2>>
valueCollectionEncode;
+ private final Map<MultimapEventListener<K1, V1>, InternalBackingMultimapEventListener> listeners =
+ Maps.newIdentityHashMap();
public TranscodingAsyncConsistentMultimap(
AsyncConsistentMultimap<K2, V2> backingMap,
@@ -244,6 +249,27 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K1, V1> listener, Executor executor) {
+ synchronized (listeners) {
+ InternalBackingMultimapEventListener backingMapListener =
+ listeners.computeIfAbsent(listener, k -> new InternalBackingMultimapEventListener(listener));
+ return backingMap.addListener(backingMapListener, executor);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K1, V1> listener) {
+ synchronized (listeners) {
+ InternalBackingMultimapEventListener backingMapListener = listeners.remove(listener);
+ if (backingMapListener != null) {
+ return backingMap.removeListener(backingMapListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ }
+
+ @Override
public void addStatusChangeListener(Consumer<Status> listener) {
backingMap.addStatusChangeListener(listener);
}
@@ -290,4 +316,21 @@
return EnumSet.of(Characteristics.UNORDERED);
}
}
+
+ private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
+
+ private final MultimapEventListener<K1, V1> listener;
+
+ InternalBackingMultimapEventListener(MultimapEventListener<K1, V1> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(MultimapEvent<K2, V2> event) {
+ listener.event(new MultimapEvent(event.name(),
+ keyDecoder.apply(event.key()),
+ valueDecoder.apply(event.newValue()),
+ valueDecoder.apply(event.oldValue())));
+ }
+ }
}