Implement listeners for ConsistentMultimap.
Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 5110187..47972d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -27,12 +27,14 @@
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
+import org.onosproject.store.service.MultimapEvent;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
@@ -40,6 +42,8 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -61,12 +65,14 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
import static org.slf4j.LoggerFactory.getLogger;
@@ -78,7 +84,7 @@
private final Logger log = getLogger(getClass());
private final AtomicLong globalVersion = new AtomicLong(1);
- //TODO Add listener map here
+ private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
public AtomixConsistentSetMultimapState(Properties properties) {
@@ -110,6 +116,8 @@
executor.register(MultiRemove.class, this::multiRemove);
executor.register(Put.class, this::put);
executor.register(Replace.class, this::replace);
+ executor.register(Listen.class, this::listen);
+ executor.register(Unlisten.class, this::unlisten);
}
/**
@@ -313,12 +321,20 @@
*/
protected Versioned<Collection<? extends byte[]>> removeAll(
Commit<? extends RemoveAll> commit) {
- if (!backingMap.containsKey(commit.operation().key())) {
+ String key = commit.operation().key();
+
+ if (!backingMap.containsKey(key)) {
commit.close();
return new Versioned<>(Sets.newHashSet(), -1);
- } else {
- return backingMap.get(commit.operation().key()).addCommit(commit);
}
+
+ Versioned<Collection<? extends byte[]>> removedValues =
+ backingMap.get(key).addCommit(commit);
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ return removedValues;
}
/**
@@ -328,14 +344,26 @@
* @return true if any change results, else false
*/
protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
- if (!backingMap.containsKey(commit.operation().key())) {
+ String key = commit.operation().key();
+
+ if (!backingMap.containsKey(key)) {
commit.close();
return false;
- } else {
- return (backingMap
- .get(commit.operation().key())
- .addCommit(commit)) != null;
}
+
+ Versioned<Collection<? extends byte[]>> removedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (removedValues != null) {
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ return true;
+ }
+
+ return false;
}
/**
@@ -345,16 +373,27 @@
* @return true if this commit results in a change, else false
*/
protected boolean put(Commit<? extends Put> commit) {
+ String key = commit.operation().key();
if (commit.operation().values().isEmpty()) {
return false;
}
- if (!backingMap.containsKey(commit.operation().key())) {
- backingMap.put(commit.operation().key(),
- new NonTransactionalCommit(1));
+ if (!backingMap.containsKey(key)) {
+ backingMap.put(key, new NonTransactionalCommit(1));
}
- return backingMap
- .get(commit.operation().key())
- .addCommit(commit) != null;
+
+ Versioned<Collection<? extends byte[]>> addedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (addedValues != null) {
+ publish(addedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, value, null))
+ .collect(Collectors.toList()));
+ return true;
+ }
+
+ return false;
}
protected Versioned<Collection<? extends byte[]>> replace(
@@ -366,6 +405,56 @@
return backingMap.get(commit.operation().key()).addCommit(commit);
}
+ /**
+ * Handles a listen commit.
+ *
+ * @param commit listen commit
+ */
+ protected void listen(Commit<? extends Listen> commit) {
+ Long sessionId = commit.session().id();
+ if (listeners.putIfAbsent(sessionId, commit) != null) {
+ commit.close();
+ return;
+ }
+ commit.session()
+ .onStateChange(
+ state -> {
+ if (state == ServerSession.State.CLOSED
+ || state == ServerSession.State.EXPIRED) {
+ Commit<? extends Listen> listener = listeners.remove(sessionId);
+ if (listener != null) {
+ listener.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * Handles an unlisten commit.
+ *
+ * @param commit unlisten commit
+ */
+ protected void unlisten(Commit<? extends Unlisten> commit) {
+ try {
+ Commit<? extends Listen> listener = listeners.remove(commit.session().id());
+ if (listener != null) {
+ listener.close();
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Publishes events to listeners.
+ *
+ * @param events list of map event to publish
+ */
+ private void publish(List<MultimapEvent<String, byte[]>> events) {
+ listeners.values().forEach(commit ->
+ commit.session().publish(AtomixConsistentSetMultimap.CHANGE_SUBJECT, events));
+ }
+
private interface MapEntryValue {
/**