Implement listeners for ConsistentMultimap.
Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
index e8142a4..c7c9f5d 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
@@ -21,6 +21,7 @@
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Synchronous;
import org.onosproject.store.service.Versioned;
@@ -29,6 +30,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -144,6 +146,16 @@
//FIXME implement this when a new version of ConsistentMapBackedJavaMap is made for multimaps
}
+ @Override
+ public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ complete(asyncMultimap.addListener(listener, executor));
+ }
+
+ @Override
+ public void removeListener(MultimapEventListener<K, V> listener) {
+ complete(asyncMultimap.removeListener(listener));
+ }
+
private <T> T complete(CompletableFuture<T> future) {
try {
return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 36d6d25..6a44e06 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -17,12 +17,14 @@
package org.onosproject.store.service;
import com.google.common.collect.Multiset;
+import com.google.common.util.concurrent.MoreExecutors;
import org.onosproject.store.primitives.DefaultConsistentMultimap;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Interface for a distributed multimap.
@@ -222,6 +224,34 @@
CompletableFuture<Collection<Map.Entry<K, V>>> entries();
/**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ * @return future that will be completed when the operation finishes
+ */
+ default CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener) {
+ return addListener(listener, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ * @param executor executor to use for handling incoming map events
+ * @return future that will be completed when the operation finishes
+ */
+ CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor);
+
+ /**
+ * Unregisters the specified listener such that it will no longer
+ * receive map change notifications.
+ *
+ * @param listener listener to unregister
+ * @return future that will be completed when the operation finishes
+ */
+ CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener);
+
+ /**
* Returns a map of keys to collections of values that reflect the set of
* key-value pairs contained in the multimap, where the key value pairs
* would be the key paired with each of the values in the collection.
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
index 8858369..bba38bb 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
@@ -17,10 +17,12 @@
package org.onosproject.store.service;
import com.google.common.collect.Multiset;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
/**
* This provides a synchronous version of the functionality provided by
@@ -203,4 +205,29 @@
* empty.
*/
Map<K, Collection<V>> asMap();
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ */
+ default void addListener(MultimapEventListener<K, V> listener) {
+ addListener(listener, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ * @param executor executor to use for handling incoming map events
+ */
+ void addListener(MultimapEventListener<K, V> listener, Executor executor);
+
+ /**
+ * Unregisters the specified listener such that it will no longer
+ * receive map change notifications.
+ *
+ * @param listener listener to unregister
+ */
+ void removeListener(MultimapEventListener<K, V> listener);
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MultimapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MultimapEvent.java
new file mode 100644
index 0000000..ed56974
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MultimapEvent.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * Representation of a ConsistentMultimap update notification.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MultimapEvent<K, V> {
+
+ /**
+ * MultimapEvent type.
+ */
+ public enum Type {
+ /**
+ * Entry inserted into the map.
+ */
+ INSERT,
+
+ /**
+ * Entry removed from map.
+ */
+ REMOVE
+ }
+
+ private final String name;
+ private final Type type;
+ private final K key;
+ private final V newValue;
+ private final V oldValue;
+
+ /**
+ * Creates a new event object.
+ *
+ * @param name map name
+ * @param key key the event concerns
+ * @param newValue new value key is mapped to
+ * @param oldValue previous value that was mapped to the key
+ */
+ public MultimapEvent(String name, K key, V newValue, V oldValue) {
+ this.name = name;
+ this.key = key;
+ this.newValue = newValue;
+ this.oldValue = oldValue;
+ this.type = newValue != null ? Type.INSERT : Type.REMOVE;
+ }
+
+ /**
+ * Returns the map name.
+ *
+ * @return name of map
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the type of the event.
+ *
+ * @return the type of event
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the key this event concerns.
+ *
+ * @return the key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the new value in the map associated with the key.
+ * If {@link #type()} returns {@code REMOVE},
+ * this method will return {@code null}.
+ *
+ * @return the new value for key
+ */
+ public V newValue() {
+ return newValue;
+ }
+
+ /**
+ * Returns the old value that was associated with the key.
+ * If {@link #type()} returns {@code INSERT}, this method will return
+ * {@code null}.
+ *
+ * @return the old value that was mapped to the key
+ */
+ public V oldValue() {
+ return oldValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MultimapEvent)) {
+ return false;
+ }
+
+ MultimapEvent<K, V> that = (MultimapEvent) o;
+ return Objects.equals(this.name, that.name) &&
+ Objects.equals(this.type, that.type) &&
+ Objects.equals(this.key, that.key) &&
+ Objects.equals(this.newValue, that.newValue) &&
+ Objects.equals(this.oldValue, that.oldValue);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, key, newValue, oldValue);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("name", name)
+ .add("type", type)
+ .add("key", key)
+ .add("newValue", newValue)
+ .add("oldValue", oldValue)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MultimapEventListener.java b/core/api/src/main/java/org/onosproject/store/service/MultimapEventListener.java
new file mode 100644
index 0000000..2445af9
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MultimapEventListener.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a ConsistentMultimap.
+ */
+public interface MultimapEventListener<K, V> {
+
+ /**
+ * Reacts to the specified event.
+ *
+ * @param event the event
+ */
+ void event(MultimapEvent<K, V> event);
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
index 6bf6602..2e46d92 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -134,6 +135,14 @@
}
@Override
+ public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ }
+
+ @Override
+ public void removeListener(MultimapEventListener<K, V> listener) {
+ }
+
+ @Override
public String name() {
return this.name;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index d676ab1..2c4ee73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -51,6 +51,7 @@
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
@@ -100,6 +101,8 @@
serializer.register(TransactionLog.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
+ serializer.register(MultimapEvent.class, factory);
+ serializer.register(MultimapEvent.Type.class, factory);
serializer.register(Task.class, factory);
serializer.register(WorkQueueStats.class, factory);
serializer.register(DocumentPath.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 5bd37c5..3db4dce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,14 +16,17 @@
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
+import java.util.concurrent.Executor;
/**
* {@code AsyncConsistentMultimap} that merely delegates control to
@@ -133,6 +136,16 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return delegateMap.addListener(listener, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return delegateMap.removeListener(listener);
+ }
+
+ @Override
public CompletableFuture<Map<K, Collection<V>>> asMap() {
return delegateMap.asMap();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
index 6ec0a69..2045586 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -15,16 +15,17 @@
*/
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
-
/**
* {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
* {@link Executor}.
@@ -125,6 +126,16 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return asyncFuture(delegateMap.addListener(listener, executor));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return asyncFuture(delegateMap.removeListener(listener));
+ }
+
+ @Override
public CompletableFuture<Map<K, Collection<V>>> asMap() {
return asyncFuture(delegateMap.asMap());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index c2f167a..52cf210 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -16,6 +16,17 @@
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
@@ -28,18 +39,6 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import org.onlab.util.Tools;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Maps;
-
/**
* An {@code AsyncConsistentMap} that maps its operations to operations on a
* differently typed {@code AsyncConsistentMap} by transcoding operation inputs and outputs.
@@ -259,11 +258,13 @@
@Override
public CompletableFuture<Void> removeListener(MapEventListener<K1, V1> listener) {
- InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
- if (backingMapListener != null) {
- return backingMap.removeListener(backingMapListener);
- } else {
- return CompletableFuture.completedFuture(null);
+ synchronized (listeners) {
+ InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
+ if (backingMapListener != null) {
+ return backingMap.removeListener(backingMapListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 666ac1f..9d699d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -21,6 +21,8 @@
import com.google.common.collect.Multiset;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
@@ -28,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
@@ -60,6 +63,8 @@
Versioned<Collection<? extends V1>>> versionedValueCollectionDecode;
private final Function<Collection<? extends V1>, Collection<V2>>
valueCollectionEncode;
+ private final Map<MultimapEventListener<K1, V1>, InternalBackingMultimapEventListener> listeners =
+ Maps.newIdentityHashMap();
public TranscodingAsyncConsistentMultimap(
AsyncConsistentMultimap<K2, V2> backingMap,
@@ -244,6 +249,27 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K1, V1> listener, Executor executor) {
+ synchronized (listeners) {
+ InternalBackingMultimapEventListener backingMapListener =
+ listeners.computeIfAbsent(listener, k -> new InternalBackingMultimapEventListener(listener));
+ return backingMap.addListener(backingMapListener, executor);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K1, V1> listener) {
+ synchronized (listeners) {
+ InternalBackingMultimapEventListener backingMapListener = listeners.remove(listener);
+ if (backingMapListener != null) {
+ return backingMap.removeListener(backingMapListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ }
+
+ @Override
public void addStatusChangeListener(Consumer<Status> listener) {
backingMap.addStatusChangeListener(listener);
}
@@ -290,4 +316,21 @@
return EnumSet.of(Characteristics.UNORDERED);
}
}
+
+ private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
+
+ private final MultimapEventListener<K1, V1> listener;
+
+ InternalBackingMultimapEventListener(MultimapEventListener<K1, V1> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(MultimapEvent<K2, V2> event) {
+ listener.event(new MultimapEvent(event.name(),
+ keyDecoder.apply(event.key()),
+ valueDecoder.apply(event.newValue()),
+ valueDecoder.apply(event.oldValue())));
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
index d765466..b8e5d4c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
@@ -563,6 +563,44 @@
}
/**
+ * Change listen.
+ */
+ @SuppressWarnings("serial")
+ public static class Listen implements Command<Void>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public CompactionMode compaction() {
+ return CompactionMode.QUORUM;
+ }
+ }
+
+ /**
+ * Change unlisten.
+ */
+ @SuppressWarnings("serial")
+ public static class Unlisten implements Command<Void>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public CompactionMode compaction() {
+ return CompactionMode.TOMBSTONE;
+ }
+ }
+
+ /**
* Multimap command type resolver.
*/
@SuppressWarnings("serial")
@@ -584,6 +622,8 @@
registry.register(Put.class, -1012);
registry.register(RemoveAll.class, -1013);
registry.register(MultiRemove.class, -1014);
+ registry.register(Listen.class, -1015);
+ registry.register(Unlisten.class, -1016);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index e446a67..22a0345 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -22,14 +22,19 @@
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.ConcurrentModificationException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Clear;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsEntry;
@@ -40,13 +45,16 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
+
/**
* Set based implementation of the {@link AsyncConsistentMultimap}.
* <p>
@@ -57,6 +65,10 @@
extends AbstractResource<AtomixConsistentSetMultimap>
implements AsyncConsistentMultimap<String, byte[]> {
+ private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
+
+ public static final String CHANGE_SUBJECT = "multimapChangeEvents";
+
public AtomixConsistentSetMultimap(CopycatClient client,
Properties properties) {
super(client, properties);
@@ -64,8 +76,20 @@
@Override
public CompletableFuture<AtomixConsistentSetMultimap> open() {
- return super.open();
- //TODO
+ return super.open().thenApply(result -> {
+ client.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED && isListening()) {
+ client.submit(new Listen());
+ }
+ });
+ client.onEvent(CHANGE_SUBJECT, this::handleEvent);
+ return result;
+ });
+ }
+
+ private void handleEvent(List<MultimapEvent<String, byte[]>> events) {
+ events.forEach(event ->
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
@Override
@@ -158,6 +182,24 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<String, byte[]> listener, Executor executor) {
+ if (mapEventListeners.isEmpty()) {
+ return client.submit(new Listen()).thenRun(() -> mapEventListeners.put(listener, executor));
+ } else {
+ mapEventListeners.put(listener, executor);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<String, byte[]> listener) {
+ if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
+ return client.submit(new Unlisten()).thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
public CompletableFuture<Map<String, Collection<byte[]>>> asMap() {
throw new UnsupportedOperationException("Expensive operation.");
}
@@ -178,4 +220,8 @@
"in progress");
}
}
+
+ private boolean isListening() {
+ return !mapEventListeners.isEmpty();
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 5110187..47972d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -27,12 +27,14 @@
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
@@ -40,6 +42,8 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -61,12 +65,14 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
import static org.slf4j.LoggerFactory.getLogger;
@@ -78,7 +84,7 @@
private final Logger log = getLogger(getClass());
private final AtomicLong globalVersion = new AtomicLong(1);
- //TODO Add listener map here
+ private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
public AtomixConsistentSetMultimapState(Properties properties) {
@@ -110,6 +116,8 @@
executor.register(MultiRemove.class, this::multiRemove);
executor.register(Put.class, this::put);
executor.register(Replace.class, this::replace);
+ executor.register(Listen.class, this::listen);
+ executor.register(Unlisten.class, this::unlisten);
}
/**
@@ -313,12 +321,20 @@
*/
protected Versioned<Collection<? extends byte[]>> removeAll(
Commit<? extends RemoveAll> commit) {
- if (!backingMap.containsKey(commit.operation().key())) {
+ String key = commit.operation().key();
+
+ if (!backingMap.containsKey(key)) {
commit.close();
return new Versioned<>(Sets.newHashSet(), -1);
- } else {
- return backingMap.get(commit.operation().key()).addCommit(commit);
}
+
+ Versioned<Collection<? extends byte[]>> removedValues =
+ backingMap.get(key).addCommit(commit);
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ return removedValues;
}
/**
@@ -328,14 +344,26 @@
* @return true if any change results, else false
*/
protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
- if (!backingMap.containsKey(commit.operation().key())) {
+ String key = commit.operation().key();
+
+ if (!backingMap.containsKey(key)) {
commit.close();
return false;
- } else {
- return (backingMap
- .get(commit.operation().key())
- .addCommit(commit)) != null;
}
+
+ Versioned<Collection<? extends byte[]>> removedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (removedValues != null) {
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ return true;
+ }
+
+ return false;
}
/**
@@ -345,16 +373,27 @@
* @return true if this commit results in a change, else false
*/
protected boolean put(Commit<? extends Put> commit) {
+ String key = commit.operation().key();
if (commit.operation().values().isEmpty()) {
return false;
}
- if (!backingMap.containsKey(commit.operation().key())) {
- backingMap.put(commit.operation().key(),
- new NonTransactionalCommit(1));
+ if (!backingMap.containsKey(key)) {
+ backingMap.put(key, new NonTransactionalCommit(1));
}
- return backingMap
- .get(commit.operation().key())
- .addCommit(commit) != null;
+
+ Versioned<Collection<? extends byte[]>> addedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (addedValues != null) {
+ publish(addedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, value, null))
+ .collect(Collectors.toList()));
+ return true;
+ }
+
+ return false;
}
protected Versioned<Collection<? extends byte[]>> replace(
@@ -366,6 +405,56 @@
return backingMap.get(commit.operation().key()).addCommit(commit);
}
+ /**
+ * Handles a listen commit.
+ *
+ * @param commit listen commit
+ */
+ protected void listen(Commit<? extends Listen> commit) {
+ Long sessionId = commit.session().id();
+ if (listeners.putIfAbsent(sessionId, commit) != null) {
+ commit.close();
+ return;
+ }
+ commit.session()
+ .onStateChange(
+ state -> {
+ if (state == ServerSession.State.CLOSED
+ || state == ServerSession.State.EXPIRED) {
+ Commit<? extends Listen> listener = listeners.remove(sessionId);
+ if (listener != null) {
+ listener.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * Handles an unlisten commit.
+ *
+ * @param commit unlisten commit
+ */
+ protected void unlisten(Commit<? extends Unlisten> commit) {
+ try {
+ Commit<? extends Listen> listener = listeners.remove(commit.session().id());
+ if (listener != null) {
+ listener.close();
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Publishes events to listeners.
+ *
+ * @param events list of map event to publish
+ */
+ private void publish(List<MultimapEvent<String, byte[]>> events) {
+ listeners.values().forEach(commit ->
+ commit.session().publish(AtomixConsistentSetMultimap.CHANGE_SUBJECT, events));
+ }
+
private interface MapEntryValue {
/**
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 7d1a9d1..2cfd0c4 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -20,7 +20,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-
import org.onlab.packet.ChassisId;
import org.onlab.packet.EthType;
import org.onlab.packet.Ip4Address;
@@ -214,6 +213,7 @@
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
@@ -528,6 +528,8 @@
.register(Versioned.class)
.register(MapEvent.class)
.register(MapEvent.Type.class)
+ .register(MultimapEvent.class)
+ .register(MultimapEvent.Type.class)
.register(SetEvent.class)
.register(SetEvent.Type.class)
.register(GroupId.class)