Refactor AtomixConsistentMap to use separate operations per method call for better performance and control over operation semantics.
Change-Id: I948c5c73d4ab38c9c2b20f8c80ba01548f95dda6
diff --git a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
index 818e749..2e8aa61 100644
--- a/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/MapEvent.java
@@ -62,12 +62,25 @@
* @param previousValue value that was replaced
*/
public MapEvent(String name, K key, Versioned<V> currentValue, Versioned<V> previousValue) {
+ this(currentValue != null ? previousValue != null ? Type.UPDATE : Type.INSERT : Type.REMOVE,
+ name, key, currentValue, previousValue);
+ }
+
+ /**
+ * Creates a new event object.
+ *
+ * @param type event type
+ * @param name map name
+ * @param key key the event concerns
+ * @param currentValue new value key is mapped to
+ * @param previousValue value that was replaced
+ */
+ public MapEvent(Type type, String name, K key, Versioned<V> currentValue, Versioned<V> previousValue) {
+ this.type = type;
this.name = name;
this.key = key;
this.newValue = currentValue;
this.oldValue = previousValue;
- this.type = currentValue != null ?
- previousValue != null ? Type.UPDATE : Type.INSERT : Type.REMOVE;
}
/**
diff --git a/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java b/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java
index 698498e..10311a9 100644
--- a/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java
+++ b/core/api/src/test/java/org/onosproject/store/service/MapEventTest.java
@@ -30,11 +30,14 @@
private final Versioned<Integer> vStatsNew = new Versioned<>(2, 2);
private final Versioned<Integer> vStatsOld = new Versioned<>(1, 1);
- private final MapEvent<String, Integer> stats1 = new MapEvent<>("a", "1", vStatsNew, null);
+ private final MapEvent<String, Integer> stats1 =
+ new MapEvent<>(MapEvent.Type.INSERT, "a", "1", vStatsNew, null);
- private final MapEvent<String, Integer> stats2 = new MapEvent<>("a", "1", null, vStatsOld);
+ private final MapEvent<String, Integer> stats2 =
+ new MapEvent<>(MapEvent.Type.REMOVE, "a", "1", null, vStatsOld);
- private final MapEvent<String, Integer> stats3 = new MapEvent<>("a", "1", vStatsNew, vStatsOld);
+ private final MapEvent<String, Integer> stats3 =
+ new MapEvent<>(MapEvent.Type.UPDATE, "a", "1", vStatsNew, vStatsOld);
/**
* Tests the creation of the MapEvent object.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index e0e3ad7..d6e5e91 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -79,7 +79,6 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations;
import org.onosproject.store.primitives.resources.impl.AtomixCounterOperations;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents;
@@ -165,7 +164,6 @@
.register(AtomixConsistentMapOperations.class)
.register(AtomixConsistentSetMultimapOperations.class)
.register(AtomixConsistentSetMultimapEvents.class)
- .register(AtomixConsistentTreeMapEvents.class)
.register(AtomixConsistentTreeMapOperations.class)
.register(AtomixCounterOperations.class)
.register(AtomixDocumentTreeEvents.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 52cf210..03e2f6d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -338,7 +338,9 @@
@Override
public void event(MapEvent<K2, V2> event) {
- listener.event(new MapEvent<K1, V1>(event.name(),
+ listener.event(new MapEvent<K1, V1>(
+ event.type(),
+ event.name(),
keyDecoder.apply(event.key()),
event.newValue() != null ? event.newValue().map(valueDecoder) : null,
event.oldValue() != null ? event.oldValue().map(valueDecoder) : null));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index 2f0683e..2db8382 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -400,6 +400,7 @@
@Override
public void event(MapEvent<String, V2> event) {
listener.event(new MapEvent<String, V1>(
+ event.type(),
event.name(),
event.key(),
event.newValue() != null ?
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index f3a1ea2..5b59fb2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -24,29 +24,31 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import io.atomix.protocols.raft.proxy.RaftProxy;
import org.onlab.util.KryoNamespace;
-import org.onlab.util.Match;
-import org.onlab.util.Tools;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
@@ -68,10 +70,18 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
/**
@@ -82,6 +92,7 @@
.register(KryoNamespaces.BASIC)
.register(AtomixConsistentMapOperations.NAMESPACE)
.register(AtomixConsistentMapEvents.NAMESPACE)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
.build());
private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
@@ -96,6 +107,10 @@
});
}
+ protected Serializer serializer() {
+ return SERIALIZER;
+ }
+
private void handleEvent(List<MapEvent<String, byte[]>> events) {
events.forEach(event ->
mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
@@ -103,164 +118,164 @@
@Override
public CompletableFuture<Boolean> isEmpty() {
- return proxy.invoke(IS_EMPTY, SERIALIZER::decode);
+ return proxy.invoke(IS_EMPTY, serializer()::decode);
}
@Override
public CompletableFuture<Integer> size() {
- return proxy.invoke(SIZE, SERIALIZER::decode);
+ return proxy.invoke(SIZE, serializer()::decode);
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
- return proxy.invoke(CONTAINS_KEY, SERIALIZER::encode, new ContainsKey(key), SERIALIZER::decode);
+ return proxy.invoke(CONTAINS_KEY, serializer()::encode, new ContainsKey(key), serializer()::decode);
}
@Override
public CompletableFuture<Boolean> containsValue(byte[] value) {
- return proxy.invoke(CONTAINS_VALUE, SERIALIZER::encode, new ContainsValue(value), SERIALIZER::decode);
+ return proxy.invoke(CONTAINS_VALUE, serializer()::encode, new ContainsValue(value), serializer()::decode);
}
@Override
public CompletableFuture<Versioned<byte[]>> get(String key) {
- return proxy.invoke(GET, SERIALIZER::encode, new Get(key), SERIALIZER::decode);
+ return proxy.invoke(GET, serializer()::encode, new Get(key), serializer()::decode);
}
@Override
public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
return proxy.invoke(
GET_OR_DEFAULT,
- SERIALIZER::encode,
+ serializer()::encode,
new GetOrDefault(key, defaultValue),
- SERIALIZER::decode);
+ serializer()::decode);
}
@Override
public CompletableFuture<Set<String>> keySet() {
- return proxy.invoke(KEY_SET, SERIALIZER::decode);
+ return proxy.invoke(KEY_SET, serializer()::decode);
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values() {
- return proxy.invoke(VALUES, SERIALIZER::decode);
+ return proxy.invoke(VALUES, serializer()::decode);
}
@Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
- return proxy.invoke(ENTRY_SET, SERIALIZER::decode);
+ return proxy.invoke(ENTRY_SET, serializer()::decode);
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.ANY, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
+ return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+ PUT,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.ANY, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.newValue());
+ return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+ PUT_AND_GET,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.NULL, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
+ return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+ PUT_IF_ABSENT,
+ serializer()::encode,
+ new Put(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, null, Match.ANY, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
+ return proxy.<Remove, MapEntryUpdateResult<String, byte[]>>invoke(
+ REMOVE,
+ serializer()::encode,
+ new Remove(key),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
+ return proxy.<RemoveValue, MapEntryUpdateResult<String, byte[]>>invoke(
+ REMOVE_VALUE,
+ serializer()::encode,
+ new RemoveValue(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
+ return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
+ REMOVE_VERSION,
+ serializer()::encode,
+ new RemoveVersion(key, version),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
+ return proxy.<Replace, MapEntryUpdateResult<String, byte[]>>invoke(
+ REPLACE,
+ serializer()::encode,
+ new Replace(key, value),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> v.result());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
+ return proxy.<ReplaceValue, MapEntryUpdateResult<String, byte[]>>invoke(
+ REPLACE_VALUE,
+ serializer()::encode,
+ new ReplaceValue(key, oldValue, newValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
+ return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
+ REPLACE_VERSION,
+ serializer()::encode,
+ new ReplaceVersion(key, oldVersion, newValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Void> clear() {
- return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, SERIALIZER::decode)
+ return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, serializer()::decode)
.whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> null);
}
@@ -277,37 +292,39 @@
return CompletableFuture.completedFuture(r1);
}
- AtomicReference<byte[]> computedValue = new AtomicReference<>();
- // if remappingFunction throws an exception, return the exception.
+ byte[] computedValue;
try {
- computedValue.set(remappingFunction.apply(key, existingValue));
+ computedValue = remappingFunction.apply(key, existingValue);
} catch (Exception e) {
CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
- if (computedValue.get() == null && r1 == null) {
+
+ if (computedValue == null && r1 == null) {
return CompletableFuture.completedFuture(null);
}
- Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
- Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key,
- computedValue.get(),
- valueMatch,
- versionMatch),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenCompose(r -> {
- if (r.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
- r.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
- return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
- }
- return CompletableFuture.completedFuture(r);
- })
- .thenApply(v -> v.newValue());
+
+ if (r1 == null) {
+ return proxy.<Put, MapEntryUpdateResult<String, byte[]>>invoke(
+ PUT_IF_ABSENT,
+ serializer()::encode,
+ new Put(key, computedValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(result -> new Versioned<>(computedValue, result.version()));
+ } else if (computedValue == null) {
+ return remove(key, r1.version()).thenApply(v -> null);
+ } else {
+ return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
+ REPLACE_VERSION,
+ serializer()::encode,
+ new ReplaceVersion(key, r1.version(), computedValue),
+ serializer()::decode)
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
+ ? new Versioned(computedValue, result.version()) : result.result());
+ }
});
}
@@ -330,6 +347,12 @@
return CompletableFuture.completedFuture(null);
}
+ private void throwIfLocked(MapEntryUpdateResult<String, byte[]> result) {
+ if (result != null) {
+ throwIfLocked(result.status());
+ }
+ }
+
private void throwIfLocked(MapEntryUpdateResult.Status status) {
if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
@@ -340,9 +363,9 @@
public CompletableFuture<Version> begin(TransactionId transactionId) {
return proxy.<TransactionBegin, Long>invoke(
BEGIN,
- SERIALIZER::encode,
+ serializer()::encode,
new TransactionBegin(transactionId),
- SERIALIZER::decode)
+ serializer()::decode)
.thenApply(Version::new);
}
@@ -350,9 +373,9 @@
public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
return proxy.<TransactionPrepare, PrepareResult>invoke(
PREPARE,
- SERIALIZER::encode,
+ serializer()::encode,
new TransactionPrepare(transactionLog),
- SERIALIZER::decode)
+ serializer()::decode)
.thenApply(v -> v == PrepareResult.OK);
}
@@ -360,9 +383,9 @@
public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
PREPARE_AND_COMMIT,
- SERIALIZER::encode,
+ serializer()::encode,
new TransactionPrepareAndCommit(transactionLog),
- SERIALIZER::decode)
+ serializer()::decode)
.thenApply(v -> v == PrepareResult.OK);
}
@@ -370,9 +393,9 @@
public CompletableFuture<Void> commit(TransactionId transactionId) {
return proxy.<TransactionCommit, CommitResult>invoke(
COMMIT,
- SERIALIZER::encode,
+ serializer()::encode,
new TransactionCommit(transactionId),
- SERIALIZER::decode)
+ serializer()::decode)
.thenApply(v -> null);
}
@@ -380,9 +403,9 @@
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return proxy.invoke(
ROLLBACK,
- SERIALIZER::encode,
+ serializer()::encode,
new TransactionRollback(transactionId),
- SERIALIZER::decode)
+ serializer()::decode)
.thenApply(v -> null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
index 2d81787..fb6ee9f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
@@ -15,9 +15,9 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import com.google.common.base.MoreObjects;
import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType;
+import io.atomix.utils.ArraySizeHashPrinter;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
@@ -26,6 +26,7 @@
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
+import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -41,7 +42,15 @@
KEY_SET("keySet", OperationType.QUERY),
VALUES("values", OperationType.QUERY),
ENTRY_SET("entrySet", OperationType.QUERY),
- UPDATE_AND_GET("updateAndGet", OperationType.COMMAND),
+ PUT("put", OperationType.COMMAND),
+ PUT_IF_ABSENT("putIfAbsent", OperationType.COMMAND),
+ PUT_AND_GET("putAndGet", OperationType.COMMAND),
+ REMOVE("remove", OperationType.COMMAND),
+ REMOVE_VALUE("removeValue", OperationType.COMMAND),
+ REMOVE_VERSION("removeVersion", OperationType.COMMAND),
+ REPLACE("replace", OperationType.COMMAND),
+ REPLACE_VALUE("replaceValue", OperationType.COMMAND),
+ REPLACE_VERSION("replaceVersion", OperationType.COMMAND),
CLEAR("clear", OperationType.COMMAND),
ADD_LISTENER("addListener", OperationType.COMMAND),
REMOVE_LISTENER("removeListener", OperationType.COMMAND),
@@ -76,7 +85,13 @@
.register(ContainsValue.class)
.register(Get.class)
.register(GetOrDefault.class)
- .register(UpdateAndGet.class)
+ .register(Put.class)
+ .register(Remove.class)
+ .register(RemoveValue.class)
+ .register(RemoveVersion.class)
+ .register(Replace.class)
+ .register(ReplaceValue.class)
+ .register(ReplaceVersion.class)
.register(TransactionBegin.class)
.register(TransactionPrepare.class)
.register(TransactionPrepareAndCommit.class)
@@ -103,7 +118,7 @@
public abstract static class MapOperation {
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(getClass())
.toString();
}
}
@@ -132,7 +147,7 @@
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(getClass())
.add("key", key)
.toString();
}
@@ -162,13 +177,77 @@
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(getClass())
.add("value", value)
.toString();
}
}
/**
+ * Abstract key/value operation.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class KeyValueOperation extends KeyOperation {
+ protected byte[] value;
+
+ public KeyValueOperation() {
+ }
+
+ public KeyValueOperation(String key, byte[] value) {
+ super(key);
+ this.value = value;
+ }
+
+ /**
+ * Returns the value.
+ * @return value
+ */
+ public byte[] value() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(getClass())
+ .add("key", key)
+ .add("value", ArraySizeHashPrinter.of(value))
+ .toString();
+ }
+ }
+
+ /**
+ * Abstract key/version operation.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class KeyVersionOperation extends KeyOperation {
+ protected long version;
+
+ public KeyVersionOperation() {
+ }
+
+ public KeyVersionOperation(String key, long version) {
+ super(key);
+ this.version = version;
+ }
+
+ /**
+ * Returns the version.
+ * @return version
+ */
+ public long version() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(getClass())
+ .add("key", key)
+ .add("version", version)
+ .toString();
+ }
+ }
+
+ /**
* Contains key command.
*/
@SuppressWarnings("serial")
@@ -195,6 +274,134 @@
}
/**
+ * Map put operation.
+ */
+ public static class Put extends KeyValueOperation {
+ public Put() {
+ }
+
+ public Put(String key, byte[] value) {
+ super(key, value);
+ }
+ }
+
+ /**
+ * Remove operation.
+ */
+ public static class Remove extends KeyOperation {
+ public Remove() {
+ }
+
+ public Remove(String key) {
+ super(key);
+ }
+ }
+
+ /**
+ * Remove if value match operation.
+ */
+ public static class RemoveValue extends KeyValueOperation {
+ public RemoveValue() {
+ }
+
+ public RemoveValue(String key, byte[] value) {
+ super(key, value);
+ }
+ }
+
+ /**
+ * Remove if version match operation.
+ */
+ public static class RemoveVersion extends KeyVersionOperation {
+ public RemoveVersion() {
+ }
+
+ public RemoveVersion(String key, long version) {
+ super(key, version);
+ }
+ }
+
+ /**
+ * Replace operation.
+ */
+ public static class Replace extends KeyValueOperation {
+ public Replace() {
+ }
+
+ public Replace(String key, byte[] value) {
+ super(key, value);
+ }
+ }
+
+ /**
+ * Replace by value operation.
+ */
+ public static class ReplaceValue extends KeyOperation {
+ private byte[] oldValue;
+ private byte[] newValue;
+
+ public ReplaceValue() {
+ }
+
+ public ReplaceValue(String key, byte[] oldValue, byte[] newValue) {
+ super(key);
+ this.oldValue = oldValue;
+ this.newValue = newValue;
+ }
+
+ public byte[] oldValue() {
+ return oldValue;
+ }
+
+ public byte[] newValue() {
+ return newValue;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("key", key)
+ .add("oldValue", ArraySizeHashPrinter.of(oldValue))
+ .add("newValue", ArraySizeHashPrinter.of(newValue))
+ .toString();
+ }
+ }
+
+ /**
+ * Replace by version operation.
+ */
+ public static class ReplaceVersion extends KeyOperation {
+ private long oldVersion;
+ private byte[] newValue;
+
+ public ReplaceVersion() {
+ }
+
+ public ReplaceVersion(String key, long oldVersion, byte[] newValue) {
+ super(key);
+ this.oldVersion = oldVersion;
+ this.newValue = newValue;
+ }
+
+ public long oldVersion() {
+ return oldVersion;
+ }
+
+ public byte[] newValue() {
+ return newValue;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("key", key)
+ .add("oldVersion", oldVersion)
+ .add("newValue", ArraySizeHashPrinter.of(newValue))
+ .toString();
+ }
+ }
+
+ /**
* Transaction begin command.
*/
public static class TransactionBegin extends MapOperation {
@@ -232,7 +439,7 @@
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(getClass())
.add("transactionLog", transactionLog)
.toString();
}
@@ -275,7 +482,7 @@
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(getClass())
.add("transactionId", transactionId)
.toString();
}
@@ -305,79 +512,13 @@
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(getClass())
.add("transactionId", transactionId)
.toString();
}
}
/**
- * Map update command.
- */
- @SuppressWarnings("serial")
- public static class UpdateAndGet extends MapOperation {
- private String key;
- private byte[] value;
- private Match<byte[]> valueMatch;
- private Match<Long> versionMatch;
-
- public UpdateAndGet() {
- }
-
- public UpdateAndGet(String key,
- byte[] value,
- Match<byte[]> valueMatch,
- Match<Long> versionMatch) {
- this.key = key;
- this.value = value;
- this.valueMatch = valueMatch;
- this.versionMatch = versionMatch;
- }
-
- /**
- * Returns the key.
- * @return key
- */
- public String key() {
- return this.key;
- }
-
- /**
- * Returns the value.
- * @return value
- */
- public byte[] value() {
- return this.value;
- }
-
- /**
- * Returns the value match.
- * @return value match
- */
- public Match<byte[]> valueMatch() {
- return this.valueMatch;
- }
-
- /**
- * Returns the version match.
- * @return version match
- */
- public Match<Long> versionMatch() {
- return this.versionMatch;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("key", key)
- .add("value", value)
- .add("valueMatch", valueMatch)
- .add("versionMatch", versionMatch)
- .toString();
- }
- }
-
- /**
* Get query.
*/
@SuppressWarnings("serial")
@@ -413,5 +554,13 @@
public byte[] defaultValue() {
return defaultValue;
}
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("key", key)
+ .add("defaultValue", ArraySizeHashPrinter.of(defaultValue))
+ .toString();
+ }
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index 6d8c1b0..fe5a2a9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -22,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.google.common.base.Throwables;
@@ -42,12 +44,18 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsValue;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GetOrDefault;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Remove;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.RemoveVersion;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Replace;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceValue;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ReplaceVersion;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionBegin;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UpdateAndGet;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
@@ -69,14 +77,19 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PREPARE_AND_COMMIT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT_IF_ABSENT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REMOVE_VERSION;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VALUE;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.REPLACE_VERSION;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.VALUES;
-import static org.onosproject.store.service.MapEvent.Type.INSERT;
-import static org.onosproject.store.service.MapEvent.Type.REMOVE;
-import static org.onosproject.store.service.MapEvent.Type.UPDATE;
/**
* State Machine for {@link AtomixConsistentMap} resource.
@@ -87,7 +100,7 @@
.register(KryoNamespaces.BASIC)
.register(AtomixConsistentMapOperations.NAMESPACE)
.register(AtomixConsistentMapEvents.NAMESPACE)
- .nextId(KryoNamespace.FLOATING_ID)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
.register(TransactionScope.class)
.register(TransactionLog.class)
.register(TransactionId.class)
@@ -96,56 +109,80 @@
.register(new HashMap().keySet().getClass())
.build());
- private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
- private Map<String, MapEntryValue> mapEntries = new HashMap<>();
- private Set<String> preparedKeys = Sets.newHashSet();
- private Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
- private long currentVersion;
+ protected Map<Long, RaftSession> listeners = new LinkedHashMap<>();
+ private Map<String, MapEntryValue> map;
+ protected Set<String> preparedKeys = Sets.newHashSet();
+ protected Map<TransactionId, TransactionScope> activeTransactions = Maps.newHashMap();
+ protected long currentVersion;
+
+ public AtomixConsistentMapService() {
+ map = createMap();
+ }
+
+ protected Map<String, MapEntryValue> createMap() {
+ return Maps.newHashMap();
+ }
+
+ protected Map<String, MapEntryValue> entries() {
+ return map;
+ }
+
+ protected Serializer serializer() {
+ return SERIALIZER;
+ }
@Override
public void snapshot(SnapshotWriter writer) {
- writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
- writer.writeObject(preparedKeys, SERIALIZER::encode);
- writer.writeObject(mapEntries, SERIALIZER::encode);
- writer.writeObject(activeTransactions, SERIALIZER::encode);
+ writer.writeObject(Sets.newHashSet(listeners.keySet()), serializer()::encode);
+ writer.writeObject(preparedKeys, serializer()::encode);
+ writer.writeObject(entries(), serializer()::encode);
+ writer.writeObject(activeTransactions, serializer()::encode);
writer.writeLong(currentVersion);
}
@Override
public void install(SnapshotReader reader) {
listeners = new LinkedHashMap<>();
- for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
+ for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
listeners.put(sessionId, getSessions().getSession(sessionId));
}
- preparedKeys = reader.readObject(SERIALIZER::decode);
- mapEntries = reader.readObject(SERIALIZER::decode);
- activeTransactions = reader.readObject(SERIALIZER::decode);
+ preparedKeys = reader.readObject(serializer()::decode);
+ map = reader.readObject(serializer()::decode);
+ activeTransactions = reader.readObject(serializer()::decode);
currentVersion = reader.readLong();
}
@Override
protected void configure(RaftServiceExecutor executor) {
// Listeners
- executor.register(ADD_LISTENER, this::listen);
- executor.register(REMOVE_LISTENER, this::unlisten);
+ executor.register(ADD_LISTENER, (Commit<Void> c) -> listen(c.session()));
+ executor.register(REMOVE_LISTENER, (Commit<Void> c) -> unlisten(c.session()));
// Queries
- executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
- executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
- executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
- executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
- executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
- executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
- executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
- executor.register(SIZE, this::size, SERIALIZER::encode);
- executor.register(VALUES, this::values, SERIALIZER::encode);
+ executor.register(CONTAINS_KEY, serializer()::decode, this::containsKey, serializer()::encode);
+ executor.register(CONTAINS_VALUE, serializer()::decode, this::containsValue, serializer()::encode);
+ executor.register(ENTRY_SET, (Commit<Void> c) -> entrySet(), serializer()::encode);
+ executor.register(GET, serializer()::decode, this::get, serializer()::encode);
+ executor.register(GET_OR_DEFAULT, serializer()::decode, this::getOrDefault, serializer()::encode);
+ executor.register(IS_EMPTY, (Commit<Void> c) -> isEmpty(), serializer()::encode);
+ executor.register(KEY_SET, (Commit<Void> c) -> keySet(), serializer()::encode);
+ executor.register(SIZE, (Commit<Void> c) -> size(), serializer()::encode);
+ executor.register(VALUES, (Commit<Void> c) -> values(), serializer()::encode);
// Commands
- executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
- executor.register(CLEAR, this::clear, SERIALIZER::encode);
- executor.register(BEGIN, SERIALIZER::decode, this::begin, SERIALIZER::encode);
- executor.register(PREPARE, SERIALIZER::decode, this::prepare, SERIALIZER::encode);
- executor.register(PREPARE_AND_COMMIT, SERIALIZER::decode, this::prepareAndCommit, SERIALIZER::encode);
- executor.register(COMMIT, SERIALIZER::decode, this::commit, SERIALIZER::encode);
- executor.register(ROLLBACK, SERIALIZER::decode, this::rollback, SERIALIZER::encode);
+ executor.register(PUT, serializer()::decode, this::put, serializer()::encode);
+ executor.register(PUT_IF_ABSENT, serializer()::decode, this::putIfAbsent, serializer()::encode);
+ executor.register(PUT_AND_GET, serializer()::decode, this::putAndGet, serializer()::encode);
+ executor.register(REMOVE, serializer()::decode, this::remove, serializer()::encode);
+ executor.register(REMOVE_VALUE, serializer()::decode, this::removeValue, serializer()::encode);
+ executor.register(REMOVE_VERSION, serializer()::decode, this::removeVersion, serializer()::encode);
+ executor.register(REPLACE, serializer()::decode, this::replace, serializer()::encode);
+ executor.register(REPLACE_VALUE, serializer()::decode, this::replaceValue, serializer()::encode);
+ executor.register(REPLACE_VERSION, serializer()::decode, this::replaceVersion, serializer()::encode);
+ executor.register(CLEAR, (Commit<Void> c) -> clear(), serializer()::encode);
+ executor.register(BEGIN, serializer()::decode, this::begin, serializer()::encode);
+ executor.register(PREPARE, serializer()::decode, this::prepare, serializer()::encode);
+ executor.register(PREPARE_AND_COMMIT, serializer()::decode, this::prepareAndCommit, serializer()::encode);
+ executor.register(COMMIT, serializer()::decode, this::commit, serializer()::encode);
+ executor.register(ROLLBACK, serializer()::decode, this::rollback, serializer()::encode);
}
/**
@@ -155,7 +192,7 @@
* @return {@code true} if map contains key
*/
protected boolean containsKey(Commit<? extends ContainsKey> commit) {
- MapEntryValue value = mapEntries.get(commit.value().key());
+ MapEntryValue value = entries().get(commit.value().key());
return value != null && value.type() != MapEntryValue.Type.TOMBSTONE;
}
@@ -167,7 +204,7 @@
*/
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
Match<byte[]> valueMatch = Match.ifValue(commit.value().value());
- return mapEntries.values().stream()
+ return entries().values().stream()
.filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
.anyMatch(value -> valueMatch.matches(value.value()));
}
@@ -179,7 +216,7 @@
* @return value mapped to key
*/
protected Versioned<byte[]> get(Commit<? extends Get> commit) {
- return toVersioned(mapEntries.get(commit.value().key()));
+ return toVersioned(entries().get(commit.value().key()));
}
/**
@@ -189,7 +226,7 @@
* @return value mapped to key
*/
protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
- MapEntryValue value = mapEntries.get(commit.value().key());
+ MapEntryValue value = entries().get(commit.value().key());
if (value == null) {
return new Versioned<>(commit.value().defaultValue(), 0);
} else if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -200,13 +237,12 @@
}
/**
- * Handles a count commit.
+ * Handles a size commit.
*
- * @param commit size commit
* @return number of entries in map
*/
- protected int size(Commit<Void> commit) {
- return (int) mapEntries.values().stream()
+ protected int size() {
+ return (int) entries().values().stream()
.filter(value -> value.type() != MapEntryValue.Type.TOMBSTONE)
.count();
}
@@ -214,22 +250,20 @@
/**
* Handles an is empty commit.
*
- * @param commit isEmpty commit
* @return {@code true} if map is empty
*/
- protected boolean isEmpty(Commit<Void> commit) {
- return mapEntries.values().stream()
+ protected boolean isEmpty() {
+ return entries().values().stream()
.noneMatch(value -> value.type() != MapEntryValue.Type.TOMBSTONE);
}
/**
* Handles a keySet commit.
*
- * @param commit keySet commit
* @return set of keys in map
*/
- protected Set<String> keySet(Commit<Void> commit) {
- return mapEntries.entrySet().stream()
+ protected Set<String> keySet() {
+ return entries().entrySet().stream()
.filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
@@ -238,11 +272,10 @@
/**
* Handles a values commit.
*
- * @param commit values commit
* @return collection of values in map
*/
- protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
- return mapEntries.entrySet().stream()
+ protected Collection<Versioned<byte[]>> values() {
+ return entries().entrySet().stream()
.filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
.map(entry -> toVersioned(entry.getValue()))
.collect(Collectors.toList());
@@ -251,103 +284,311 @@
/**
* Handles a entry set commit.
*
- * @param commit entrySet commit
* @return set of map entries
*/
- protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
- return mapEntries.entrySet().stream()
+ protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet() {
+ return entries().entrySet().stream()
.filter(entry -> entry.getValue().type() != MapEntryValue.Type.TOMBSTONE)
.map(e -> Maps.immutableEntry(e.getKey(), toVersioned(e.getValue())))
.collect(Collectors.toSet());
}
/**
- * Handles a update and get commit.
+ * Returns a boolean indicating whether the given MapEntryValues are equal.
*
- * @param commit updateAndGet commit
- * @return update result
+ * @param oldValue the first value to compare
+ * @param newValue the second value to compare
+ * @return indicates whether the two values are equal
*/
- protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
- try {
- MapEntryUpdateResult.Status updateStatus = validate(commit.value());
- String key = commit.value().key();
- MapEntryValue oldCommitValue = mapEntries.get(commit.value().key());
- Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+ protected boolean valuesEqual(MapEntryValue oldValue, MapEntryValue newValue) {
+ return (oldValue == null && newValue == null)
+ || (oldValue != null && newValue != null && valuesEqual(oldValue.value(), newValue.value()));
+ }
- if (updateStatus != MapEntryUpdateResult.Status.OK) {
- return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, oldMapValue);
- }
+ /**
+ * Returns a boolean indicating whether the given entry values are equal.
+ *
+ * @param oldValue the first value to compare
+ * @param newValue the second value to compare
+ * @return indicates whether the two values are equal
+ */
+ protected boolean valuesEqual(byte[] oldValue, byte[] newValue) {
+ return (oldValue == null && newValue == null)
+ || (oldValue != null && newValue != null && Arrays.equals(oldValue, newValue));
+ }
- byte[] newValue = commit.value().value();
- currentVersion = commit.index();
- Versioned<byte[]> newMapValue = newValue == null ? null
- : new Versioned<>(newValue, currentVersion);
+ /**
+ * Returns a boolean indicating whether the given MapEntryValue is null or a tombstone.
+ *
+ * @param value the value to check
+ * @return indicates whether the given value is null or is a tombstone
+ */
+ protected boolean valueIsNull(MapEntryValue value) {
+ return value == null || value.type() == MapEntryValue.Type.TOMBSTONE;
+ }
- MapEvent.Type updateType = newValue == null ? REMOVE
- : oldCommitValue == null ? INSERT : UPDATE;
-
- // If a value existed in the map, remove and discard the value to ensure disk can be freed.
- if (updateType == REMOVE || updateType == UPDATE) {
- mapEntries.remove(key);
- }
-
- // If this is an insert/update commit, add the commit to the map entries.
- if (updateType == INSERT || updateType == UPDATE) {
- mapEntries.put(key, new MapEntryValue(
- MapEntryValue.Type.VALUE,
+ /**
+ * Handles a put commit.
+ *
+ * @param commit put commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> put(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ MapEntryValue oldValue = entries().get(key);
+ MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+ if (valueIsNull(oldValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
commit.index(),
- commit.value().value()));
- } else if (!activeTransactions.isEmpty()) {
- // If this is a delete but transactions are currently running, ensure tombstones are retained
- // for version checks.
- mapEntries.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, commit.index(), null));
+ key,
+ toVersioned(oldValue));
}
-
- publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
- return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue, newMapValue);
- } catch (Exception e) {
- getLogger().error("State machine operation failed", e);
- throw Throwables.propagate(e);
+ entries().put(commit.value().key(),
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ Versioned<byte[]> result = toVersioned(oldValue);
+ publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+ } else if (!valuesEqual(oldValue, newValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ entries().put(commit.value().key(),
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ Versioned<byte[]> result = toVersioned(oldValue);
+ publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
}
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+ }
+
+ /**
+ * Handles a putIfAbsent commit.
+ *
+ * @param commit putIfAbsent commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ MapEntryValue oldValue = entries().get(key);
+ if (valueIsNull(oldValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ MapEntryValue newValue = new MapEntryValue(
+ MapEntryValue.Type.VALUE,
+ commit.index(),
+ commit.value().value());
+ entries().put(commit.value().key(),
+ new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
+ Versioned<byte[]> result = toVersioned(newValue);
+ publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
+ }
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+ }
+
+ /**
+ * Handles a putAndGet commit.
+ *
+ * @param commit putAndGet commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> putAndGet(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ MapEntryValue oldValue = entries().get(key);
+ MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+ if (valueIsNull(oldValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ entries().put(commit.value().key(), newValue);
+ Versioned<byte[]> result = toVersioned(newValue);
+ publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+ } else if (!valuesEqual(oldValue, newValue)) {
+ if (preparedKeys.contains(key)) {
+ return new MapEntryUpdateResult<>(
+ MapEntryUpdateResult.Status.WRITE_LOCK,
+ commit.index(),
+ key,
+ toVersioned(oldValue));
+ }
+ entries().put(commit.value().key(), newValue);
+ Versioned<byte[]> result = toVersioned(newValue);
+ publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, result, toVersioned(oldValue)));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
+ }
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
+ }
+
+ /**
+ * Handles a remove commit.
+ *
+ * @param index the commit index
+ * @param key the key to remove
+ * @param predicate predicate to determine whether to remove the entry
+ * @return map entry update result
+ */
+ private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
+ MapEntryValue value = entries().get(key);
+ if (value == null || !predicate.test(value)) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+ }
+ entries().remove(key);
+ if (!activeTransactions.isEmpty()) {
+ entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+ }
+ Versioned<byte[]> result = toVersioned(value);
+ publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+ }
+
+ /**
+ * Handles a remove commit.
+ *
+ * @param commit remove commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> remove(Commit<? extends Remove> commit) {
+ return removeIf(commit.index(), commit.value().key(), v -> true);
+ }
+
+ /**
+ * Handles a removeValue commit.
+ *
+ * @param commit removeValue commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> removeValue(Commit<? extends RemoveValue> commit) {
+ return removeIf(commit.index(), commit.value().key(), v ->
+ valuesEqual(v, new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value())));
+ }
+
+ /**
+ * Handles a removeVersion commit.
+ *
+ * @param commit removeVersion commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> removeVersion(Commit<? extends RemoveVersion> commit) {
+ return removeIf(commit.index(), commit.value().key(), v -> v.version() == commit.value().version());
+ }
+
+ /**
+ * Handles a replace commit.
+ *
+ * @param index the commit index
+ * @param key the key to replace
+ * @param newValue the value with which to replace the key
+ * @param predicate a predicate to determine whether to replace the key
+ * @return map entry update result
+ */
+ private MapEntryUpdateResult<String, byte[]> replaceIf(
+ long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
+ MapEntryValue oldValue = entries().get(key);
+ if (oldValue == null) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
+ }
+ if (!predicate.test(oldValue)) {
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
+ }
+ entries().put(key, newValue);
+ Versioned<byte[]> result = toVersioned(oldValue);
+ publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
+ return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
+ }
+
+ /**
+ * Handles a replace commit.
+ *
+ * @param commit replace commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> replace(Commit<? extends Replace> commit) {
+ MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
+ return replaceIf(commit.index(), commit.value().key(), value, v -> true);
+ }
+
+ /**
+ * Handles a replaceValue commit.
+ *
+ * @param commit replaceValue commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> replaceValue(Commit<? extends ReplaceValue> commit) {
+ MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+ return replaceIf(commit.index(), commit.value().key(), value,
+ v -> valuesEqual(v.value(), commit.value().oldValue()));
+ }
+
+ /**
+ * Handles a replaceVersion commit.
+ *
+ * @param commit replaceVersion commit
+ * @return map entry update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> replaceVersion(Commit<? extends ReplaceVersion> commit) {
+ MapEntryValue value = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().newValue());
+ return replaceIf(commit.index(), commit.value().key(), value,
+ v -> v.version() == commit.value().oldVersion());
}
/**
* Handles a clear commit.
*
- * @param commit clear commit
* @return clear result
*/
- protected MapEntryUpdateResult.Status clear(Commit<Void> commit) {
- Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
- .entrySet().iterator();
+ protected MapEntryUpdateResult.Status clear() {
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
+ Map<String, MapEntryValue> entriesToAdd = new HashMap<>();
while (iterator.hasNext()) {
Map.Entry<String, MapEntryValue> entry = iterator.next();
String key = entry.getKey();
MapEntryValue value = entry.getValue();
- Versioned<byte[]> removedValue = new Versioned<>(value.value(),
- value.version());
- publish(Lists.newArrayList(new MapEvent<>("", key, null, removedValue)));
- iterator.remove();
+ if (!valueIsNull(value)) {
+ Versioned<byte[]> removedValue = new Versioned<>(value.value(), value.version());
+ publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, removedValue));
+ if (activeTransactions.isEmpty()) {
+ iterator.remove();
+ } else {
+ entriesToAdd.put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
+ }
+ }
}
+ entries().putAll(entriesToAdd);
return MapEntryUpdateResult.Status.OK;
}
/**
* Handles a listen commit.
*
- * @param commit listen commit
+ * @param session listen session
*/
- protected void listen(Commit<Void> commit) {
- listeners.put(commit.session().sessionId().id(), commit.session());
+ protected void listen(RaftSession session) {
+ listeners.put(session.sessionId().id(), session);
}
/**
* Handles an unlisten commit.
*
- * @param commit unlisten commit
+ * @param session unlisten session
*/
- protected void unlisten(Commit<Void> commit) {
- listeners.remove(commit.session().sessionId().id());
+ protected void unlisten(RaftSession session) {
+ listeners.remove(session.sessionId().id());
}
/**
@@ -412,7 +653,7 @@
}
// Read the existing value from the map.
- MapEntryValue existingValue = mapEntries.get(key);
+ MapEntryValue existingValue = entries().get(key);
// Note: if the existing value is null, that means the key has not changed during the transaction,
// otherwise a tombstone would have been retained.
@@ -501,7 +742,7 @@
continue;
}
- MapEntryValue previousValue = mapEntries.remove(key);
+ MapEntryValue previousValue = entries().remove(key);
MapEntryValue newValue = null;
// If the record is not a delete, create a transactional commit.
@@ -512,11 +753,42 @@
newValue = new MapEntryValue(MapEntryValue.Type.TOMBSTONE, currentVersion, null);
}
- eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
-
+ MapEvent<String, byte[]> event;
if (newValue != null) {
- mapEntries.put(key, newValue);
+ entries().put(key, newValue);
+ if (!valueIsNull(newValue)) {
+ if (!valueIsNull(previousValue)) {
+ event = new MapEvent<>(
+ MapEvent.Type.UPDATE,
+ "",
+ key,
+ toVersioned(newValue),
+ toVersioned(previousValue));
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.INSERT,
+ "",
+ key,
+ toVersioned(newValue),
+ null);
+ }
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.REMOVE,
+ "",
+ key,
+ null,
+ toVersioned(previousValue));
+ }
+ } else {
+ event = new MapEvent<>(
+ MapEvent.Type.REMOVE,
+ "",
+ key,
+ null,
+ toVersioned(previousValue));
}
+ eventsToPublish.add(event);
}
publish(eventsToPublish);
return CommitResult.OK;
@@ -557,7 +829,7 @@
*/
private void discardTombstones() {
if (activeTransactions.isEmpty()) {
- Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
while (iterator.hasNext()) {
MapEntryValue value = iterator.next().getValue();
if (value.type() == MapEntryValue.Type.TOMBSTONE) {
@@ -568,7 +840,7 @@
long lowWaterMark = activeTransactions.values().stream()
.mapToLong(TransactionScope::version)
.min().getAsLong();
- Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries.entrySet().iterator();
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = entries().entrySet().iterator();
while (iterator.hasNext()) {
MapEntryValue value = iterator.next().getValue();
if (value.type() == MapEntryValue.Type.TOMBSTONE && value.version < lowWaterMark) {
@@ -579,46 +851,32 @@
}
/**
- * Computes the update status that would result if the specified update were to applied to
- * the state machine.
- *
- * @param update update
- * @return status
- */
- private MapEntryUpdateResult.Status validate(UpdateAndGet update) {
- MapEntryValue existingValue = mapEntries.get(update.key());
- boolean isEmpty = existingValue == null || existingValue.type() == MapEntryValue.Type.TOMBSTONE;
- if (isEmpty && update.value() == null) {
- return MapEntryUpdateResult.Status.NOOP;
- }
- if (preparedKeys.contains(update.key())) {
- return MapEntryUpdateResult.Status.WRITE_LOCK;
- }
- byte[] existingRawValue = isEmpty ? null : existingValue.value();
- Long existingVersion = isEmpty ? null : existingValue.version();
- return update.valueMatch().matches(existingRawValue)
- && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
- : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
- }
-
- /**
* Utility for turning a {@code MapEntryValue} to {@code Versioned}.
* @param value map entry value
* @return versioned instance
*/
- private Versioned<byte[]> toVersioned(MapEntryValue value) {
+ protected Versioned<byte[]> toVersioned(MapEntryValue value) {
return value != null && value.type() != MapEntryValue.Type.TOMBSTONE
? new Versioned<>(value.value(), value.version()) : null;
}
/**
+ * Publishes an event to listeners.
+ *
+ * @param event event to publish
+ */
+ private void publish(MapEvent<String, byte[]> event) {
+ publish(Lists.newArrayList(event));
+ }
+
+ /**
* Publishes events to listeners.
*
* @param events list of map event to publish
*/
private void publish(List<MapEvent<String, byte[]>> events) {
listeners.values().forEach(session -> {
- session.publish(CHANGE, SERIALIZER::encode, events);
+ session.publish(CHANGE, serializer()::encode, events);
});
}
@@ -639,7 +897,7 @@
/**
* Interface implemented by map values.
*/
- private static class MapEntryValue {
+ protected static class MapEntryValue {
protected final Type type;
protected final long version;
protected final byte[] value;
@@ -689,7 +947,7 @@
/**
* Map transaction scope.
*/
- private static final class TransactionScope {
+ protected static final class TransactionScope {
private final long version;
private final TransactionLog<MapUpdate<String, byte[]>> transactionLog;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
index 8f91b3d..e11333d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
@@ -16,63 +16,37 @@
package org.onosproject.store.primitives.resources.impl;
-import java.util.Collection;
-import java.util.ConcurrentModificationException;
-import java.util.List;
+import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
-import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-import com.google.common.collect.Maps;
import io.atomix.protocols.raft.proxy.RaftProxy;
import org.onlab.util.KryoNamespace;
-import org.onlab.util.Match;
-import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorEntry;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherEntry;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerEntry;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentTreeMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CLEAR;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_VALUE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsValue;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ENTRY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET_OR_DEFAULT;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.Get;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GetOrDefault;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_ENTRY;
@@ -80,338 +54,105 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_FIRST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_LAST_ENTRY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.REMOVE_LISTENER;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SIZE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UpdateAndGet;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.VALUES;
/**
* Implementation of {@link AsyncConsistentTreeMap}.
*/
-public class AtomixConsistentTreeMap extends AbstractRaftPrimitive implements AsyncConsistentTreeMap<byte[]> {
+public class AtomixConsistentTreeMap extends AtomixConsistentMap implements AsyncConsistentTreeMap<byte[]> {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
+ .register(AtomixConsistentMapOperations.NAMESPACE)
.register(AtomixConsistentTreeMapOperations.NAMESPACE)
- .register(AtomixConsistentTreeMapEvents.NAMESPACE)
+ .register(AtomixConsistentMapEvents.NAMESPACE)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 150)
+ .register(AtomixConsistentMapService.TransactionScope.class)
+ .register(TransactionLog.class)
+ .register(TransactionId.class)
+ .register(AtomixConsistentMapService.MapEntryValue.class)
+ .register(AtomixConsistentMapService.MapEntryValue.Type.class)
+ .register(new HashMap().keySet().getClass())
+ .register(TreeMap.class)
.build());
- private final Map<MapEventListener<String, byte[]>, Executor>
- mapEventListeners = Maps.newConcurrentMap();
-
public AtomixConsistentTreeMap(RaftProxy proxy) {
super(proxy);
- proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
- }
-
- private void handleEvent(List<MapEvent<String, byte[]>> events) {
- events.forEach(event -> mapEventListeners.
- forEach((listener, executor) ->
- executor.execute(() ->
- listener.event(event))));
}
@Override
- public CompletableFuture<Boolean> isEmpty() {
- return proxy.invoke(IS_EMPTY, SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Integer> size() {
- return proxy.invoke(SIZE, SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Boolean> containsKey(String key) {
- return proxy.invoke(CONTAINS_KEY, SERIALIZER::encode, new ContainsKey(key), SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Boolean> containsValue(byte[] value) {
- return proxy.invoke(CONTAINS_VALUE, SERIALIZER::encode, new ContainsValue(value), SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Versioned<byte[]>> get(String key) {
- return proxy.invoke(GET, SERIALIZER::encode, new Get(key), SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Versioned<byte[]>> getOrDefault(String key, byte[] defaultValue) {
- return proxy.invoke(
- GET_OR_DEFAULT,
- SERIALIZER::encode,
- new GetOrDefault(key, defaultValue),
- SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Set<String>> keySet() {
- return proxy.invoke(KEY_SET, SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Collection<Versioned<byte[]>>> values() {
- return proxy.invoke(VALUES, SERIALIZER::decode);
- }
-
- @Override
- public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet() {
- return proxy.invoke(ENTRY_SET, SERIALIZER::decode);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.ANY, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.ANY, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.newValue());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.NULL, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Versioned<byte[]>> remove(String key) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, null, Match.ANY, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Boolean> remove(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.updated());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Boolean> remove(String key, long version) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.updated());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.oldValue());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.updated());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.updated());
- }
-
- @Override
- public CompletableFuture<Void> clear() {
- return proxy.<MapEntryUpdateResult.Status>invoke(CLEAR, SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r))
- .thenApply(v -> null);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<Versioned<byte[]>> computeIf(String key,
- Predicate<? super byte[]> condition,
- BiFunction<? super String,
- ? super byte[],
- ? extends byte[]> remappingFunction) {
- return get(key).thenCompose(r1 -> {
- byte[] existingValue = r1 == null ? null : r1.value();
-
- if (!condition.test(existingValue)) {
- return CompletableFuture.completedFuture(r1);
- }
-
- AtomicReference<byte[]> computedValue = new AtomicReference<byte[]>();
- try {
- computedValue.set(remappingFunction.apply(key, existingValue));
- } catch (Exception e) {
- CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
- future.completeExceptionally(e);
- return future;
- }
- if (computedValue.get() == null && r1 == null) {
- return CompletableFuture.completedFuture(null);
- }
- Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
- Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
- return proxy.<UpdateAndGet, MapEntryUpdateResult<String, byte[]>>invoke(
- UPDATE_AND_GET,
- SERIALIZER::encode,
- new UpdateAndGet(key, computedValue.get(), valueMatch, versionMatch),
- SERIALIZER::decode)
- .whenComplete((r, e) -> throwIfLocked(r.status()))
- .thenApply(v -> v.newValue());
- });
- }
-
- @Override
- public CompletableFuture<Void> addListener(
- MapEventListener<String, byte[]> listener, Executor executor) {
- if (mapEventListeners.isEmpty()) {
- return proxy.invoke(ADD_LISTENER).thenRun(() ->
- mapEventListeners.put(listener,
- executor));
- } else {
- mapEventListeners.put(listener, executor);
- return CompletableFuture.completedFuture(null);
- }
- }
-
- @Override
- public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
- if (mapEventListeners.remove(listener) != null &&
- mapEventListeners.isEmpty()) {
- return proxy.invoke(REMOVE_LISTENER)
- .thenApply(v -> null);
- }
- return CompletableFuture.completedFuture(null);
- }
-
-
- private void throwIfLocked(MapEntryUpdateResult.Status status) {
- if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
- throw new ConcurrentModificationException("Cannot update TreeMap: another update is in progress.");
- }
+ protected Serializer serializer() {
+ return SERIALIZER;
}
@Override
public CompletableFuture<String> firstKey() {
- return proxy.invoke(FIRST_KEY, SERIALIZER::decode);
+ return proxy.invoke(FIRST_KEY, serializer()::decode);
}
@Override
public CompletableFuture<String> lastKey() {
- return proxy.invoke(LAST_KEY, SERIALIZER::decode);
+ return proxy.invoke(LAST_KEY, serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> ceilingEntry(String key) {
- return proxy.invoke(CEILING_ENTRY, SERIALIZER::encode, new CeilingEntry(key), SERIALIZER::decode);
+ return proxy.invoke(CEILING_ENTRY, serializer()::encode, new CeilingEntry(key), serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> floorEntry(String key) {
- return proxy.invoke(FLOOR_ENTRY, SERIALIZER::encode, new FloorEntry(key), SERIALIZER::decode);
+ return proxy.invoke(FLOOR_ENTRY, serializer()::encode, new FloorEntry(key), serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> higherEntry(
String key) {
- return proxy.invoke(HIGHER_ENTRY, SERIALIZER::encode, new HigherEntry(key), SERIALIZER::decode);
+ return proxy.invoke(HIGHER_ENTRY, serializer()::encode, new HigherEntry(key), serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lowerEntry(
String key) {
- return proxy.invoke(LOWER_ENTRY, SERIALIZER::encode, new LowerEntry(key), SERIALIZER::decode);
+ return proxy.invoke(LOWER_ENTRY, serializer()::encode, new LowerEntry(key), serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> firstEntry() {
- return proxy.invoke(FIRST_ENTRY, SERIALIZER::decode);
+ return proxy.invoke(FIRST_ENTRY, serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lastEntry() {
- return proxy.invoke(LAST_ENTRY, SERIALIZER::decode);
+ return proxy.invoke(LAST_ENTRY, serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollFirstEntry() {
- return proxy.invoke(POLL_FIRST_ENTRY, SERIALIZER::decode);
+ return proxy.invoke(POLL_FIRST_ENTRY, serializer()::decode);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollLastEntry() {
- return proxy.invoke(POLL_LAST_ENTRY, SERIALIZER::decode);
+ return proxy.invoke(POLL_LAST_ENTRY, serializer()::decode);
}
@Override
public CompletableFuture<String> lowerKey(String key) {
- return proxy.invoke(LOWER_KEY, SERIALIZER::encode, new LowerKey(key), SERIALIZER::decode);
+ return proxy.invoke(LOWER_KEY, serializer()::encode, new LowerKey(key), serializer()::decode);
}
@Override
public CompletableFuture<String> floorKey(String key) {
- return proxy.invoke(FLOOR_KEY, SERIALIZER::encode, new FloorKey(key), SERIALIZER::decode);
+ return proxy.invoke(FLOOR_KEY, serializer()::encode, new FloorKey(key), serializer()::decode);
}
@Override
public CompletableFuture<String> ceilingKey(String key) {
- return proxy.invoke(CEILING_KEY, SERIALIZER::encode, new CeilingKey(key), SERIALIZER::decode);
+ return proxy.invoke(CEILING_KEY, serializer()::encode, new CeilingKey(key), serializer()::decode);
}
@Override
public CompletableFuture<String> higherKey(String key) {
- return proxy.invoke(HIGHER_KEY, SERIALIZER::encode, new HigherKey(key), SERIALIZER::decode);
+ return proxy.invoke(HIGHER_KEY, serializer()::encode, new HigherKey(key), serializer()::decode);
}
@Override
@@ -425,29 +166,4 @@
boolean inclusiveLower) {
throw new UnsupportedOperationException("This operation is not yet supported.");
}
-
- @Override
- public CompletableFuture<Version> begin(TransactionId transactionId) {
- throw new UnsupportedOperationException("This operation is not yet supported.");
- }
-
- @Override
- public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
- throw new UnsupportedOperationException("This operation is not yet supported.");
- }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
- throw new UnsupportedOperationException("This operation is not yet supported.");
- }
-
- @Override
- public CompletableFuture<Void> commit(TransactionId transactionId) {
- throw new UnsupportedOperationException("This operation is not yet supported.");
- }
-
- @Override
- public CompletableFuture<Void> rollback(TransactionId transactionId) {
- throw new UnsupportedOperationException("This operation is not yet supported.");
- }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapEvents.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapEvents.java
deleted file mode 100644
index ab3b972..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapEvents.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.resources.impl;
-
-import io.atomix.protocols.raft.event.EventType;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.MapEvent;
-
-/**
- * Atomix consistent tree map events.
- */
-public enum AtomixConsistentTreeMapEvents implements EventType {
- CHANGE("change");
-
- private final String id;
-
- AtomixConsistentTreeMapEvents(String id) {
- this.id = id;
- }
-
- @Override
- public String id() {
- return id;
- }
-
- public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50)
- .register(MapEvent.class)
- .register(MapEvent.Type.class)
- .build("AtomixConsistentTreeMapEvents");
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java
index c6b8d88..c44a63e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapOperations.java
@@ -34,15 +34,6 @@
* state machine operations.
*/
public enum AtomixConsistentTreeMapOperations implements OperationId {
- CONTAINS_KEY("containsKey", OperationType.QUERY),
- CONTAINS_VALUE("containsValue", OperationType.QUERY),
- ENTRY_SET("entrySet", OperationType.QUERY),
- GET("get", OperationType.QUERY),
- GET_OR_DEFAULT("getOrDefault", OperationType.QUERY),
- IS_EMPTY("isEmpty", OperationType.QUERY),
- KEY_SET("keySet", OperationType.QUERY),
- SIZE("size", OperationType.QUERY),
- VALUES("values", OperationType.QUERY),
SUB_MAP("subMap", OperationType.QUERY),
FIRST_KEY("firstKey", OperationType.QUERY),
LAST_KEY("lastKey", OperationType.QUERY),
@@ -57,11 +48,7 @@
CEILING_ENTRY("ceilingEntry", OperationType.QUERY),
CEILING_KEY("ceilingKey", OperationType.QUERY),
HIGHER_ENTRY("higherEntry", OperationType.QUERY),
- HIGHER_KEY("higherKey", OperationType.QUERY),
- UPDATE_AND_GET("updateAndGet", OperationType.COMMAND),
- CLEAR("clear", OperationType.COMMAND),
- ADD_LISTENER("addListener", OperationType.COMMAND),
- REMOVE_LISTENER("removeListener", OperationType.COMMAND);
+ HIGHER_KEY("higherKey", OperationType.QUERY);
private final String id;
private final OperationType type;
@@ -83,11 +70,7 @@
public static final KryoNamespace NAMESPACE = KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(ContainsKey.class)
- .register(ContainsValue.class)
- .register(Get.class)
- .register(GetOrDefault.class)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
.register(LowerKey.class)
.register(LowerEntry.class)
.register(HigherKey.class)
@@ -96,11 +79,7 @@
.register(FloorEntry.class)
.register(CeilingKey.class)
.register(CeilingEntry.class)
- .register(UpdateAndGet.class)
- .register(Match.class)
.register(Versioned.class)
- .register(MapEntryUpdateResult.class)
- .register(MapEntryUpdateResult.Status.class)
.register(AbstractMap.SimpleImmutableEntry.class)
.register(Maps.immutableEntry("", "").getClass())
.build("AtomixConsistentTreeMapOperations");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
index b9e7135..f089051 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapService.java
@@ -16,61 +16,36 @@
package org.onosproject.store.primitives.resources.impl;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Set;
import java.util.TreeMap;
-import java.util.stream.Collectors;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.session.RaftSession;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import org.onlab.util.KryoNamespace;
-import org.onlab.util.Match;
+import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents.CHANGE;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CLEAR;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_KEY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_VALUE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsValue;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ENTRY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET_OR_DEFAULT;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.Get;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GetOrDefault;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherKey;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.IS_EMPTY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_ENTRY;
@@ -79,307 +54,134 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_FIRST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_LAST_ENTRY;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.REMOVE_LISTENER;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SIZE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SUB_MAP;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SubMap;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UpdateAndGet;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.VALUES;
-import static org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult.Status;
/**
* State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
* {@link TreeMap}.
*/
-public class AtomixConsistentTreeMapService extends AbstractRaftService {
+public class AtomixConsistentTreeMapService extends AtomixConsistentMapService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
+ .register(AtomixConsistentMapOperations.NAMESPACE)
.register(AtomixConsistentTreeMapOperations.NAMESPACE)
- .register(AtomixConsistentTreeMapEvents.NAMESPACE)
- .register(TreeMapEntryValue.class)
- .register(new HashMap<>().keySet().getClass())
+ .register(AtomixConsistentMapEvents.NAMESPACE)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 150)
+ .register(TransactionScope.class)
+ .register(TransactionLog.class)
+ .register(TransactionId.class)
+ .register(MapEntryValue.class)
+ .register(MapEntryValue.Type.class)
+ .register(new HashMap().keySet().getClass())
.register(TreeMap.class)
.build());
- private final Map<Long, RaftSession> listeners = Maps.newHashMap();
- private TreeMap<String, TreeMapEntryValue> tree = Maps.newTreeMap();
- private final Set<String> preparedKeys = Sets.newHashSet();
-
@Override
- public void snapshot(SnapshotWriter writer) {
- writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
- writer.writeObject(preparedKeys, SERIALIZER::encode);
- writer.writeObject(tree, SERIALIZER::encode);
+ protected TreeMap<String, MapEntryValue> createMap() {
+ return Maps.newTreeMap();
}
@Override
- public void install(SnapshotReader reader) {
- listeners.clear();
- for (long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
- listeners.put(sessionId, getSessions().getSession(sessionId));
- }
+ protected TreeMap<String, MapEntryValue> entries() {
+ return (TreeMap<String, MapEntryValue>) super.entries();
+ }
- preparedKeys.clear();
- preparedKeys.addAll(reader.readObject(SERIALIZER::decode));
-
- tree.clear();
- tree.putAll(reader.readObject(SERIALIZER::decode));
+ @Override
+ protected Serializer serializer() {
+ return SERIALIZER;
}
@Override
public void configure(RaftServiceExecutor executor) {
- // Listeners
- executor.register(ADD_LISTENER, this::listen);
- executor.register(REMOVE_LISTENER, this::unlisten);
- // Queries
- executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
- executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
- executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
- executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
- executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
- executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
- executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
- executor.register(SIZE, this::size, SERIALIZER::encode);
- executor.register(VALUES, this::values, SERIALIZER::encode);
- executor.register(SUB_MAP, SERIALIZER::decode, this::subMap, SERIALIZER::encode);
- executor.register(FIRST_KEY, this::firstKey, SERIALIZER::encode);
- executor.register(LAST_KEY, this::lastKey, SERIALIZER::encode);
- executor.register(FIRST_ENTRY, this::firstEntry, SERIALIZER::encode);
- executor.register(LAST_ENTRY, this::lastEntry, SERIALIZER::encode);
- executor.register(POLL_FIRST_ENTRY, this::pollFirstEntry, SERIALIZER::encode);
- executor.register(POLL_LAST_ENTRY, this::pollLastEntry, SERIALIZER::encode);
- executor.register(LOWER_ENTRY, SERIALIZER::decode, this::lowerEntry, SERIALIZER::encode);
- executor.register(LOWER_KEY, SERIALIZER::decode, this::lowerKey, SERIALIZER::encode);
- executor.register(FLOOR_ENTRY, SERIALIZER::decode, this::floorEntry, SERIALIZER::encode);
- executor.register(FLOOR_KEY, SERIALIZER::decode, this::floorKey, SERIALIZER::encode);
- executor.register(CEILING_ENTRY, SERIALIZER::decode, this::ceilingEntry, SERIALIZER::encode);
- executor.register(CEILING_KEY, SERIALIZER::decode, this::ceilingKey, SERIALIZER::encode);
- executor.register(HIGHER_ENTRY, SERIALIZER::decode, this::higherEntry, SERIALIZER::encode);
- executor.register(HIGHER_KEY, SERIALIZER::decode, this::higherKey, SERIALIZER::encode);
-
- // Commands
- executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
- executor.register(CLEAR, this::clear, SERIALIZER::encode);
+ super.configure(executor);
+ executor.register(SUB_MAP, serializer()::decode, this::subMap, serializer()::encode);
+ executor.register(FIRST_KEY, (Commit<Void> c) -> firstKey(), serializer()::encode);
+ executor.register(LAST_KEY, (Commit<Void> c) -> lastKey(), serializer()::encode);
+ executor.register(FIRST_ENTRY, (Commit<Void> c) -> firstEntry(), serializer()::encode);
+ executor.register(LAST_ENTRY, (Commit<Void> c) -> lastEntry(), serializer()::encode);
+ executor.register(POLL_FIRST_ENTRY, (Commit<Void> c) -> pollFirstEntry(), serializer()::encode);
+ executor.register(POLL_LAST_ENTRY, (Commit<Void> c) -> pollLastEntry(), serializer()::encode);
+ executor.register(LOWER_ENTRY, serializer()::decode, this::lowerEntry, serializer()::encode);
+ executor.register(LOWER_KEY, serializer()::decode, this::lowerKey, serializer()::encode);
+ executor.register(FLOOR_ENTRY, serializer()::decode, this::floorEntry, serializer()::encode);
+ executor.register(FLOOR_KEY, serializer()::decode, this::floorKey, serializer()::encode);
+ executor.register(CEILING_ENTRY, serializer()::decode, this::ceilingEntry, serializer()::encode);
+ executor.register(CEILING_KEY, serializer()::decode, this::ceilingKey, serializer()::encode);
+ executor.register(HIGHER_ENTRY, serializer()::decode, this::higherEntry, serializer()::encode);
+ executor.register(HIGHER_KEY, serializer()::decode, this::higherKey, serializer()::encode);
}
- protected boolean containsKey(Commit<? extends ContainsKey> commit) {
- return toVersioned(tree.get((commit.value().key()))) != null;
- }
-
- protected boolean containsValue(Commit<? extends ContainsValue> commit) {
- Match<byte[]> valueMatch = Match
- .ifValue(commit.value().value());
- return tree.values().stream().anyMatch(
- value -> valueMatch.matches(value.value()));
- }
-
- protected Versioned<byte[]> get(Commit<? extends Get> commit) {
- return toVersioned(tree.get(commit.value().key()));
- }
-
- protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
- Versioned<byte[]> value = toVersioned(tree.get(commit.value().key()));
- return value != null ? value : new Versioned<>(commit.value().defaultValue(), 0);
- }
-
- protected int size(Commit<Void> commit) {
- return tree.size();
- }
-
- protected boolean isEmpty(Commit<Void> commit) {
- return tree.isEmpty();
- }
-
- protected Set<String> keySet(Commit<Void> commit) {
- return tree.keySet().stream().collect(Collectors.toSet());
- }
-
- protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
- return tree.values().stream().map(this::toVersioned)
- .collect(Collectors.toList());
- }
-
- protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
- return tree
- .entrySet()
- .stream()
- .map(e -> Maps.immutableEntry(e.getKey(),
- toVersioned(e.getValue())))
- .collect(Collectors.toSet());
- }
-
- protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
- Status updateStatus = validate(commit.value());
- String key = commit.value().key();
- TreeMapEntryValue oldCommitValue = tree.get(commit.value().key());
- Versioned<byte[]> oldTreeValue = toVersioned(oldCommitValue);
-
- if (updateStatus != Status.OK) {
- return new MapEntryUpdateResult<>(updateStatus, "", key,
- oldTreeValue, oldTreeValue);
- }
-
- byte[] newValue = commit.value().value();
- long newVersion = commit.index();
- Versioned<byte[]> newTreeValue = newValue == null ? null
- : new Versioned<byte[]>(newValue, newVersion);
-
- MapEvent.Type updateType = newValue == null ? MapEvent.Type.REMOVE
- : oldCommitValue == null ? MapEvent.Type.INSERT :
- MapEvent.Type.UPDATE;
- if (updateType == MapEvent.Type.REMOVE ||
- updateType == MapEvent.Type.UPDATE) {
- tree.remove(key);
- }
- if (updateType == MapEvent.Type.INSERT ||
- updateType == MapEvent.Type.UPDATE) {
- tree.put(key, new TreeMapEntryValue(newVersion, commit.value().value()));
- }
- publish(Lists.newArrayList(new MapEvent<>("", key, newTreeValue,
- oldTreeValue)));
- return new MapEntryUpdateResult<>(updateStatus, "", key, oldTreeValue,
- newTreeValue);
- }
-
- protected Status clear(Commit<Void> commit) {
- Iterator<Map.Entry<String, TreeMapEntryValue>> iterator = tree
- .entrySet()
- .iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, TreeMapEntryValue> entry = iterator.next();
- String key = entry.getKey();
- TreeMapEntryValue value = entry.getValue();
- Versioned<byte[]> removedValue =
- new Versioned<byte[]>(value.value(),
- value.version());
- publish(Lists.newArrayList(new MapEvent<>("", key, null,
- removedValue)));
- iterator.remove();
- }
- return Status.OK;
- }
-
- protected void listen(Commit<Void> commit) {
- listeners.put(commit.session().sessionId().id(), commit.session());
- }
-
- protected void unlisten(Commit<Void> commit) {
- closeListener(commit.session().sessionId().id());
- }
-
- private Status validate(UpdateAndGet update) {
- TreeMapEntryValue existingValue = tree.get(update.key());
- if (existingValue == null && update.value() == null) {
- return Status.NOOP;
- }
- if (preparedKeys.contains(update.key())) {
- return Status.WRITE_LOCK;
- }
- byte[] existingRawValue = existingValue == null ? null :
- existingValue.value();
- Long existingVersion = existingValue == null ? null :
- existingValue.version();
- return update.valueMatch().matches(existingRawValue)
- && update.versionMatch().matches(existingVersion) ?
- Status.OK
- : Status.PRECONDITION_FAILED;
- }
-
- protected NavigableMap<String, TreeMapEntryValue> subMap(
+ protected NavigableMap<String, MapEntryValue> subMap(
Commit<? extends SubMap> commit) {
// Do not support this until lazy communication is possible. At present
// it transmits up to the entire map.
- SubMap<String, TreeMapEntryValue> subMap = commit.value();
- return tree.subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
+ SubMap<String, MapEntryValue> subMap = commit.value();
+ return entries().subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
subMap.toKey(), subMap.isInclusiveTo());
}
- protected String firstKey(Commit<Void> commit) {
- if (tree.isEmpty()) {
- return null;
- }
- return tree.firstKey();
+ protected String firstKey() {
+ return isEmpty() ? null : entries().firstKey();
}
- protected String lastKey(Commit<Void> commit) {
- return tree.isEmpty() ? null : tree.lastKey();
+ protected String lastKey() {
+ return isEmpty() ? null : entries().lastKey();
}
protected Map.Entry<String, Versioned<byte[]>> higherEntry(Commit<? extends HigherEntry> commit) {
- if (tree.isEmpty()) {
- return null;
- }
- return toVersionedEntry(
- tree.higherEntry(commit.value().key()));
+ return isEmpty() ? null : toVersionedEntry(entries().higherEntry(commit.value().key()));
}
- protected Map.Entry<String, Versioned<byte[]>> firstEntry(Commit<Void> commit) {
- if (tree.isEmpty()) {
- return null;
- }
- return toVersionedEntry(tree.firstEntry());
+ protected Map.Entry<String, Versioned<byte[]>> firstEntry() {
+ return isEmpty() ? null : toVersionedEntry(entries().firstEntry());
}
- protected Map.Entry<String, Versioned<byte[]>> lastEntry(Commit<Void> commit) {
- if (tree.isEmpty()) {
- return null;
- }
- return toVersionedEntry(tree.lastEntry());
+ protected Map.Entry<String, Versioned<byte[]>> lastEntry() {
+ return isEmpty() ? null : toVersionedEntry(entries().lastEntry());
}
- protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry(Commit<Void> commit) {
- return toVersionedEntry(tree.pollFirstEntry());
+ protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry() {
+ return toVersionedEntry(entries().pollFirstEntry());
}
- protected Map.Entry<String, Versioned<byte[]>> pollLastEntry(Commit<Void> commit) {
- return toVersionedEntry(tree.pollLastEntry());
+ protected Map.Entry<String, Versioned<byte[]>> pollLastEntry() {
+ return toVersionedEntry(entries().pollLastEntry());
}
protected Map.Entry<String, Versioned<byte[]>> lowerEntry(Commit<? extends LowerEntry> commit) {
- return toVersionedEntry(tree.lowerEntry(commit.value().key()));
+ return toVersionedEntry(entries().lowerEntry(commit.value().key()));
}
protected String lowerKey(Commit<? extends LowerKey> commit) {
- return tree.lowerKey(commit.value().key());
+ return entries().lowerKey(commit.value().key());
}
protected Map.Entry<String, Versioned<byte[]>> floorEntry(Commit<? extends FloorEntry> commit) {
- return toVersionedEntry(tree.floorEntry(commit.value().key()));
+ return toVersionedEntry(entries().floorEntry(commit.value().key()));
}
protected String floorKey(Commit<? extends FloorKey> commit) {
- return tree.floorKey(commit.value().key());
+ return entries().floorKey(commit.value().key());
}
protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(Commit<CeilingEntry> commit) {
- return toVersionedEntry(
- tree.ceilingEntry(commit.value().key()));
+ return toVersionedEntry(entries().ceilingEntry(commit.value().key()));
}
protected String ceilingKey(Commit<CeilingKey> commit) {
- return tree.ceilingKey(commit.value().key());
+ return entries().ceilingKey(commit.value().key());
}
protected String higherKey(Commit<HigherKey> commit) {
- return tree.higherKey(commit.value().key());
- }
-
- private Versioned<byte[]> toVersioned(TreeMapEntryValue value) {
- return value == null ? null :
- new Versioned<byte[]>(value.value(), value.version());
+ return entries().higherKey(commit.value().key());
}
private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
- Map.Entry<String, TreeMapEntryValue> entry) {
- //FIXME is this the best type of entry to return?
- return entry == null ? null : new SimpleImmutableEntry<>(
- entry.getKey(), toVersioned(entry.getValue()));
- }
-
- private void publish(List<MapEvent<String, byte[]>> events) {
- listeners.values().forEach(session -> session.publish(CHANGE, SERIALIZER::encode, events));
+ Map.Entry<String, MapEntryValue> entry) {
+ return entry == null || valueIsNull(entry.getValue())
+ ? null : Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue()));
}
@Override
@@ -395,22 +197,4 @@
private void closeListener(Long sessionId) {
listeners.remove(sessionId);
}
-
- private static class TreeMapEntryValue {
- private final long version;
- private final byte[] value;
-
- public TreeMapEntryValue(long version, byte[] value) {
- this.version = version;
- this.value = value;
- }
-
- public byte[] value() {
- return value;
- }
-
- public long version() {
- return version;
- }
- }
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
index 63235d6..7773db7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
@@ -15,12 +15,8 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import java.util.function.Function;
-
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.Versioned;
-
import com.google.common.base.MoreObjects;
+import org.onosproject.store.service.Versioned;
/**
* Result of a map entry update operation.
@@ -57,18 +53,16 @@
PRECONDITION_FAILED
}
- private final String mapName;
- private Status status;
+ private final Status status;
+ private final long version;
private final K key;
- private final Versioned<V> oldValue;
- private final Versioned<V> newValue;
+ private final Versioned<V> result;
- public MapEntryUpdateResult(Status status, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
+ public MapEntryUpdateResult(Status status, long version, K key, Versioned<V> result) {
this.status = status;
- this.mapName = mapName;
+ this.version = version;
this.key = key;
- this.oldValue = oldValue;
- this.newValue = newValue;
+ this.result = result;
}
/**
@@ -80,14 +74,6 @@
}
/**
- * Returns the map name.
- * @return map name
- */
- public String mapName() {
- return mapName;
- }
-
- /**
* Returns the update status.
* @return update status
*/
@@ -96,65 +82,27 @@
}
/**
- * Returns the map key.
- * @return key
+ * Returns the result version.
+ * @return result version
*/
- public K key() {
- return key;
+ public long version() {
+ return version;
}
/**
- * Returns the old value.
- * @return the previous value associated with key if updated was successful, otherwise current value
+ * Returns the value.
+ * @return the value associated with key if updated was successful, otherwise current value
*/
- public Versioned<V> oldValue() {
- return oldValue;
- }
-
- /**
- * Returns the new value after update.
- * @return if updated was unsuccessful, this is same as old value
- */
- public Versioned<V> newValue() {
- return newValue;
- }
-
- /**
- * Maps to another instance with different key and value types.
- * @param keyTransform transformer to use for transcoding keys
- * @param valueMapper mapper to use for transcoding values
- * @return new instance
- * @param <K1> key type of returned {@code MapEntryUpdateResult}
- * @param <V1> value type of returned {@code MapEntryUpdateResult}
- */
- public <K1, V1> MapEntryUpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
- return new MapEntryUpdateResult<>(status,
- mapName,
- keyTransform.apply(key),
- oldValue == null ? null : oldValue.map(valueMapper),
- newValue == null ? null : newValue.map(valueMapper));
- }
-
- /**
- * Return the map event that will be generated as a result of this update.
- * @return map event. if update was unsuccessful, this returns {@code null}
- */
- public MapEvent<K, V> toMapEvent() {
- if (!updated()) {
- return null;
- } else {
- return new MapEvent<>(mapName(), key(), newValue, oldValue);
- }
+ public Versioned<V> result() {
+ return result;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(MapEntryUpdateResult.class)
- .add("mapName", mapName)
.add("status", status)
.add("key", key)
- .add("oldValue", oldValue)
- .add("newValue", newValue)
+ .add("result", result)
.toString();
}
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index 098c193..ecd8086 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -26,14 +26,14 @@
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
import org.junit.Test;
-import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
import static org.easymock.EasyMock.mock;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.UPDATE_AND_GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.Put;
/**
* Consistent map service test.
@@ -49,10 +49,10 @@
Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
AtomixConsistentMapService service = new AtomixConsistentMapService();
- service.updateAndGet(new DefaultCommit<>(
+ service.put(new DefaultCommit<>(
2,
- UPDATE_AND_GET,
- new AtomixConsistentMapOperations.UpdateAndGet("foo", "Hello world!".getBytes(), Match.ANY, Match.ANY),
+ PUT,
+ new Put("foo", "Hello world!".getBytes()),
mock(RaftSessionContext.class),
System.currentTimeMillis()));
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index e858b3f..7b26f8b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -255,7 +255,7 @@
map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
assertNull(result);
- });
+ }).join();
map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java
deleted file mode 100644
index 04698e0..0000000
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapServiceTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.resources.impl;
-
-import io.atomix.protocols.raft.service.ServiceId;
-import io.atomix.protocols.raft.service.impl.DefaultCommit;
-import io.atomix.protocols.raft.session.impl.RaftSessionContext;
-import io.atomix.protocols.raft.storage.RaftStorage;
-import io.atomix.protocols.raft.storage.snapshot.Snapshot;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
-import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
-import io.atomix.storage.StorageLevel;
-import io.atomix.time.WallClockTimestamp;
-import org.junit.Test;
-import org.onlab.util.Match;
-import org.onosproject.store.service.Versioned;
-
-import static org.easymock.EasyMock.mock;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
-import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
-
-/**
- * Consistent tree map service test.
- */
-public class AtomixConsistentTreeMapServiceTest {
- @Test
- @SuppressWarnings("unchecked")
- public void testSnapshot() throws Exception {
- SnapshotStore store = new SnapshotStore(RaftStorage.newBuilder()
- .withPrefix("test")
- .withStorageLevel(StorageLevel.MEMORY)
- .build());
- Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
-
- AtomixConsistentTreeMapService service = new AtomixConsistentTreeMapService();
- service.updateAndGet(new DefaultCommit<>(
- 2,
- UPDATE_AND_GET,
- new AtomixConsistentTreeMapOperations.UpdateAndGet(
- "foo", "Hello world!".getBytes(), Match.ANY, Match.ANY),
- mock(RaftSessionContext.class),
- System.currentTimeMillis()));
-
- try (SnapshotWriter writer = snapshot.openWriter()) {
- service.snapshot(writer);
- }
-
- snapshot.complete();
-
- service = new AtomixConsistentTreeMapService();
- try (SnapshotReader reader = snapshot.openReader()) {
- service.install(reader);
- }
-
- Versioned<byte[]> value = service.get(new DefaultCommit<>(
- 2,
- GET,
- new AtomixConsistentTreeMapOperations.Get("foo"),
- mock(RaftSessionContext.class),
- System.currentTimeMillis()));
- assertNotNull(value);
- assertArrayEquals("Hello world!".getBytes(), value.value());
- }
-}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
index f1de625..7e8eec7 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMapTest.java
@@ -180,7 +180,6 @@
map.putAndGet(key, allValues.get(allKeys.indexOf(key))).thenAccept(secondResult -> {
assertArrayEquals(allValues.get(allKeys.indexOf(key)), firstResult.value());
assertArrayEquals(allValues.get(allKeys.indexOf(key)), secondResult.value());
- assertTrue((firstResult.version() + 1) == secondResult.version());
});
}).join());
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index e117ba5..dbfee3c 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -176,7 +176,6 @@
.register(AtomixConsistentMapOperations.class)
.register(AtomixConsistentSetMultimapOperations.class)
.register(AtomixConsistentSetMultimapEvents.class)
- .register(AtomixConsistentTreeMapEvents.class)
.register(AtomixConsistentTreeMapOperations.class)
.register(AtomixCounterOperations.class)
.register(AtomixDocumentTreeEvents.class)