Implement listeners for ConsistentMultimap.
Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index d676ab1..2c4ee73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -51,6 +51,7 @@
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
@@ -100,6 +101,8 @@
serializer.register(TransactionLog.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
+ serializer.register(MultimapEvent.class, factory);
+ serializer.register(MultimapEvent.Type.class, factory);
serializer.register(Task.class, factory);
serializer.register(WorkQueueStats.class, factory);
serializer.register(DocumentPath.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 5bd37c5..3db4dce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,14 +16,17 @@
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
+import java.util.concurrent.Executor;
/**
* {@code AsyncConsistentMultimap} that merely delegates control to
@@ -133,6 +136,16 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return delegateMap.addListener(listener, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return delegateMap.removeListener(listener);
+ }
+
+ @Override
public CompletableFuture<Map<K, Collection<V>>> asMap() {
return delegateMap.asMap();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
index 6ec0a69..2045586 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -15,16 +15,17 @@
*/
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
-
/**
* {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
* {@link Executor}.
@@ -125,6 +126,16 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return asyncFuture(delegateMap.addListener(listener, executor));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return asyncFuture(delegateMap.removeListener(listener));
+ }
+
+ @Override
public CompletableFuture<Map<K, Collection<V>>> asMap() {
return asyncFuture(delegateMap.asMap());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index c2f167a..52cf210 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -16,6 +16,17 @@
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
@@ -28,18 +39,6 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import org.onlab.util.Tools;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Maps;
-
/**
* An {@code AsyncConsistentMap} that maps its operations to operations on a
* differently typed {@code AsyncConsistentMap} by transcoding operation inputs and outputs.
@@ -259,11 +258,13 @@
@Override
public CompletableFuture<Void> removeListener(MapEventListener<K1, V1> listener) {
- InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
- if (backingMapListener != null) {
- return backingMap.removeListener(backingMapListener);
- } else {
- return CompletableFuture.completedFuture(null);
+ synchronized (listeners) {
+ InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
+ if (backingMapListener != null) {
+ return backingMap.removeListener(backingMapListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 666ac1f..9d699d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -21,6 +21,8 @@
import com.google.common.collect.Multiset;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
@@ -28,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
@@ -60,6 +63,8 @@
Versioned<Collection<? extends V1>>> versionedValueCollectionDecode;
private final Function<Collection<? extends V1>, Collection<V2>>
valueCollectionEncode;
+ private final Map<MultimapEventListener<K1, V1>, InternalBackingMultimapEventListener> listeners =
+ Maps.newIdentityHashMap();
public TranscodingAsyncConsistentMultimap(
AsyncConsistentMultimap<K2, V2> backingMap,
@@ -244,6 +249,27 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K1, V1> listener, Executor executor) {
+ synchronized (listeners) {
+ InternalBackingMultimapEventListener backingMapListener =
+ listeners.computeIfAbsent(listener, k -> new InternalBackingMultimapEventListener(listener));
+ return backingMap.addListener(backingMapListener, executor);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K1, V1> listener) {
+ synchronized (listeners) {
+ InternalBackingMultimapEventListener backingMapListener = listeners.remove(listener);
+ if (backingMapListener != null) {
+ return backingMap.removeListener(backingMapListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ }
+
+ @Override
public void addStatusChangeListener(Consumer<Status> listener) {
backingMap.addStatusChangeListener(listener);
}
@@ -290,4 +316,21 @@
return EnumSet.of(Characteristics.UNORDERED);
}
}
+
+ private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
+
+ private final MultimapEventListener<K1, V1> listener;
+
+ InternalBackingMultimapEventListener(MultimapEventListener<K1, V1> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(MultimapEvent<K2, V2> event) {
+ listener.event(new MultimapEvent(event.name(),
+ keyDecoder.apply(event.key()),
+ valueDecoder.apply(event.newValue()),
+ valueDecoder.apply(event.oldValue())));
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
index d765466..b8e5d4c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
@@ -563,6 +563,44 @@
}
/**
+ * Change listen.
+ */
+ @SuppressWarnings("serial")
+ public static class Listen implements Command<Void>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public CompactionMode compaction() {
+ return CompactionMode.QUORUM;
+ }
+ }
+
+ /**
+ * Change unlisten.
+ */
+ @SuppressWarnings("serial")
+ public static class Unlisten implements Command<Void>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public CompactionMode compaction() {
+ return CompactionMode.TOMBSTONE;
+ }
+ }
+
+ /**
* Multimap command type resolver.
*/
@SuppressWarnings("serial")
@@ -584,6 +622,8 @@
registry.register(Put.class, -1012);
registry.register(RemoveAll.class, -1013);
registry.register(MultiRemove.class, -1014);
+ registry.register(Listen.class, -1015);
+ registry.register(Unlisten.class, -1016);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index e446a67..22a0345 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -22,14 +22,19 @@
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.ConcurrentModificationException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Clear;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsEntry;
@@ -40,13 +45,16 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
+
/**
* Set based implementation of the {@link AsyncConsistentMultimap}.
* <p>
@@ -57,6 +65,10 @@
extends AbstractResource<AtomixConsistentSetMultimap>
implements AsyncConsistentMultimap<String, byte[]> {
+ private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
+
+ public static final String CHANGE_SUBJECT = "multimapChangeEvents";
+
public AtomixConsistentSetMultimap(CopycatClient client,
Properties properties) {
super(client, properties);
@@ -64,8 +76,20 @@
@Override
public CompletableFuture<AtomixConsistentSetMultimap> open() {
- return super.open();
- //TODO
+ return super.open().thenApply(result -> {
+ client.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED && isListening()) {
+ client.submit(new Listen());
+ }
+ });
+ client.onEvent(CHANGE_SUBJECT, this::handleEvent);
+ return result;
+ });
+ }
+
+ private void handleEvent(List<MultimapEvent<String, byte[]>> events) {
+ events.forEach(event ->
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
@Override
@@ -158,6 +182,24 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<String, byte[]> listener, Executor executor) {
+ if (mapEventListeners.isEmpty()) {
+ return client.submit(new Listen()).thenRun(() -> mapEventListeners.put(listener, executor));
+ } else {
+ mapEventListeners.put(listener, executor);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<String, byte[]> listener) {
+ if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
+ return client.submit(new Unlisten()).thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
public CompletableFuture<Map<String, Collection<byte[]>>> asMap() {
throw new UnsupportedOperationException("Expensive operation.");
}
@@ -178,4 +220,8 @@
"in progress");
}
}
+
+ private boolean isListening() {
+ return !mapEventListeners.isEmpty();
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 5110187..47972d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -27,12 +27,14 @@
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
@@ -40,6 +42,8 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -61,12 +65,14 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
import static org.slf4j.LoggerFactory.getLogger;
@@ -78,7 +84,7 @@
private final Logger log = getLogger(getClass());
private final AtomicLong globalVersion = new AtomicLong(1);
- //TODO Add listener map here
+ private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
public AtomixConsistentSetMultimapState(Properties properties) {
@@ -110,6 +116,8 @@
executor.register(MultiRemove.class, this::multiRemove);
executor.register(Put.class, this::put);
executor.register(Replace.class, this::replace);
+ executor.register(Listen.class, this::listen);
+ executor.register(Unlisten.class, this::unlisten);
}
/**
@@ -313,12 +321,20 @@
*/
protected Versioned<Collection<? extends byte[]>> removeAll(
Commit<? extends RemoveAll> commit) {
- if (!backingMap.containsKey(commit.operation().key())) {
+ String key = commit.operation().key();
+
+ if (!backingMap.containsKey(key)) {
commit.close();
return new Versioned<>(Sets.newHashSet(), -1);
- } else {
- return backingMap.get(commit.operation().key()).addCommit(commit);
}
+
+ Versioned<Collection<? extends byte[]>> removedValues =
+ backingMap.get(key).addCommit(commit);
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ return removedValues;
}
/**
@@ -328,14 +344,26 @@
* @return true if any change results, else false
*/
protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
- if (!backingMap.containsKey(commit.operation().key())) {
+ String key = commit.operation().key();
+
+ if (!backingMap.containsKey(key)) {
commit.close();
return false;
- } else {
- return (backingMap
- .get(commit.operation().key())
- .addCommit(commit)) != null;
}
+
+ Versioned<Collection<? extends byte[]>> removedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (removedValues != null) {
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ return true;
+ }
+
+ return false;
}
/**
@@ -345,16 +373,27 @@
* @return true if this commit results in a change, else false
*/
protected boolean put(Commit<? extends Put> commit) {
+ String key = commit.operation().key();
if (commit.operation().values().isEmpty()) {
return false;
}
- if (!backingMap.containsKey(commit.operation().key())) {
- backingMap.put(commit.operation().key(),
- new NonTransactionalCommit(1));
+ if (!backingMap.containsKey(key)) {
+ backingMap.put(key, new NonTransactionalCommit(1));
}
- return backingMap
- .get(commit.operation().key())
- .addCommit(commit) != null;
+
+ Versioned<Collection<? extends byte[]>> addedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (addedValues != null) {
+ publish(addedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, value, null))
+ .collect(Collectors.toList()));
+ return true;
+ }
+
+ return false;
}
protected Versioned<Collection<? extends byte[]>> replace(
@@ -366,6 +405,56 @@
return backingMap.get(commit.operation().key()).addCommit(commit);
}
+ /**
+ * Handles a listen commit.
+ *
+ * @param commit listen commit
+ */
+ protected void listen(Commit<? extends Listen> commit) {
+ Long sessionId = commit.session().id();
+ if (listeners.putIfAbsent(sessionId, commit) != null) {
+ commit.close();
+ return;
+ }
+ commit.session()
+ .onStateChange(
+ state -> {
+ if (state == ServerSession.State.CLOSED
+ || state == ServerSession.State.EXPIRED) {
+ Commit<? extends Listen> listener = listeners.remove(sessionId);
+ if (listener != null) {
+ listener.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * Handles an unlisten commit.
+ *
+ * @param commit unlisten commit
+ */
+ protected void unlisten(Commit<? extends Unlisten> commit) {
+ try {
+ Commit<? extends Listen> listener = listeners.remove(commit.session().id());
+ if (listener != null) {
+ listener.close();
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Publishes events to listeners.
+ *
+ * @param events list of map event to publish
+ */
+ private void publish(List<MultimapEvent<String, byte[]>> events) {
+ listeners.values().forEach(commit ->
+ commit.session().publish(AtomixConsistentSetMultimap.CHANGE_SUBJECT, events));
+ }
+
private interface MapEntryValue {
/**
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 7d1a9d1..2cfd0c4 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -20,7 +20,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-
import org.onlab.packet.ChassisId;
import org.onlab.packet.EthType;
import org.onlab.packet.Ip4Address;
@@ -214,6 +213,7 @@
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
@@ -528,6 +528,8 @@
.register(Versioned.class)
.register(MapEvent.class)
.register(MapEvent.Type.class)
+ .register(MultimapEvent.class)
+ .register(MultimapEvent.Type.class)
.register(SetEvent.class)
.register(SetEvent.Type.class)
.register(GroupId.class)