Refactor DistributedPacketStore to store packet requests in a ConsistentMultimap
Change-Id: Ia4a93c47fee726009673e99609b2f8800807e675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index 44cc614..39d49fe 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -61,9 +61,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@@ -148,6 +150,15 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> putAndGet(String key, byte[] value) {
+ return proxy.invoke(
+ PUT_AND_GET,
+ SERIALIZER::encode,
+ new Put(key, Lists.newArrayList(value), null),
+ SERIALIZER::decode);
+ }
+
+ @Override
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key,
Lists.newArrayList(value),
@@ -155,6 +166,13 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAndGet(String key, byte[] value) {
+ return proxy.invoke(REMOVE_AND_GET, SERIALIZER::encode, new MultiRemove(key,
+ Lists.newArrayList(value),
+ null), SERIALIZER::decode);
+ }
+
+ @Override
public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
return proxy.invoke(
REMOVE,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 82372a2..6ad93b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -45,7 +45,9 @@
VALUES(OperationType.QUERY),
ENTRIES(OperationType.QUERY),
PUT(OperationType.COMMAND),
+ PUT_AND_GET(OperationType.COMMAND),
REMOVE(OperationType.COMMAND),
+ REMOVE_AND_GET(OperationType.COMMAND),
REMOVE_ALL(OperationType.COMMAND),
REPLACE(OperationType.COMMAND),
CLEAR(OperationType.COMMAND),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 2312d8f..5a93bef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -77,9 +77,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@@ -153,7 +155,9 @@
executor.register(GET, serializer::decode, this::get, serializer::encode);
executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
+ executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
executor.register(PUT, serializer::decode, this::put, serializer::encode);
+ executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
executor.register(ADD_LISTENER, this::listen);
executor.register(REMOVE_LISTENER, this::unlisten);
@@ -352,8 +356,8 @@
}
Versioned<Collection<? extends byte[]>> removedValues = backingMap
- .get(key)
- .addCommit(commit);
+ .get(key)
+ .addCommit(commit);
if (removedValues != null) {
if (removedValues.value().isEmpty()) {
@@ -361,9 +365,9 @@
}
publish(removedValues.value().stream()
- .map(value -> new MultimapEvent<String, byte[]>(
- "", key, null, value))
- .collect(Collectors.toList()));
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
return true;
}
@@ -371,6 +375,37 @@
}
/**
+ * Handles a removeAndGet commit.
+ *
+ * @param commit multiRemove commit
+ * @return the updated values or null if the values are empty
+ */
+ protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
+ String key = commit.value().key();
+
+ if (!backingMap.containsKey(key)) {
+ return null;
+ }
+
+ Versioned<Collection<? extends byte[]>> removedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (removedValues != null) {
+ if (removedValues.value().isEmpty()) {
+ backingMap.remove(key);
+ }
+
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ }
+
+ return toVersioned(backingMap.get(key));
+ }
+
+ /**
* Handles a put commit, returns true if any change results from this
* commit.
* @param commit a put commit
@@ -386,20 +421,49 @@
}
Versioned<Collection<? extends byte[]>> addedValues = backingMap
- .get(key)
- .addCommit(commit);
+ .get(key)
+ .addCommit(commit);
if (addedValues != null) {
publish(addedValues.value().stream()
- .map(value -> new MultimapEvent<String, byte[]>(
- "", key, value, null))
- .collect(Collectors.toList()));
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, value, null))
+ .collect(Collectors.toList()));
return true;
}
return false;
}
+ /**
+ * Handles a putAndGet commit.
+ *
+ * @param commit a put commit
+ * @return the updated values
+ */
+ protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ if (commit.value().values().isEmpty()) {
+ return null;
+ }
+ if (!backingMap.containsKey(key)) {
+ backingMap.put(key, new NonTransactionalCommit());
+ }
+
+ Versioned<Collection<? extends byte[]>> addedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (addedValues != null) {
+ publish(addedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, value, null))
+ .collect(Collectors.toList()));
+ }
+
+ return toVersioned(backingMap.get(key));
+ }
+
protected Versioned<Collection<? extends byte[]>> replace(
Commit<? extends Replace> commit) {
if (!backingMap.containsKey(commit.value().key())) {