[ONOS-7547] Implement support for backwards/forward compatibility of ConsistentMap values on read
Change-Id: Ifffb6a883ec4ee6aa5587da58a51c1e90694e5ea
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java
index c576375..1691086 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java
@@ -38,8 +38,8 @@
private boolean meteringDisabled = false;
private boolean readOnly = false;
private boolean relaxedReadConsistency = false;
- private int revisionNumber = 1;
- private RevisionType revisionType = RevisionType.NONE;
+ private Version version;
+ private RevisionType revisionType;
public DistributedPrimitiveOptions(DistributedPrimitive.Type type) {
this.type = type;
@@ -85,17 +85,7 @@
* @return this builder
*/
public O withVersion(Version version) {
- return withRevisionNumber(version.toInt());
- }
-
- /**
- * Sets the primitive revision.
- *
- * @param revision the primitive revision
- * @return this builder
- */
- public O withRevisionNumber(int revision) {
- this.revisionNumber = revision;
+ this.version = version;
return (O) this;
}
@@ -212,12 +202,12 @@
}
/**
- * Returns the primitive revision number.
+ * Returns the primitive version.
*
- * @return the primitive revision number
+ * @return the primitive version
*/
- public final int revision() {
- return revisionNumber;
+ public final Version version() {
+ return version;
}
/**
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java
index 587789d..4dacab8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.store.service;
+import java.util.function.BiFunction;
+
import org.onosproject.store.primitives.DistributedPrimitiveOptions;
/**
@@ -28,6 +30,7 @@
private boolean nullValues = false;
private boolean purgeOnUninstall = false;
+ protected BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction;
public ConsistentMapOptions() {
super(DistributedPrimitive.Type.CONSISTENT_MAP);
@@ -54,6 +57,19 @@
}
/**
+ * Sets a compatibility function on the map.
+ *
+ * @param compatibilityFunction the compatibility function
+ * @return the consistent map builder
+ */
+ @SuppressWarnings("unchecked")
+ public O withCompatibilityFunction(
+ BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction) {
+ this.compatibilityFunction = compatibilityFunction;
+ return (O) this;
+ }
+
+ /**
* Returns whether null values are supported by the map.
*
* @return {@code true} if null values are supported; {@code false} otherwise
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CompatibleValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CompatibleValue.java
new file mode 100644
index 0000000..73c4d12
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CompatibleValue.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.core.Version;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Compatibility wrapper for primitive values.
+ */
+public class CompatibleValue<T> {
+ private final T value;
+ private final Version version;
+
+ public CompatibleValue(T value, Version version) {
+ this.value = value;
+ this.version = version;
+ }
+
+ /**
+ * Returns the wrapped value.
+ *
+ * @return the wrapped value
+ */
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Returns the compatibilty version.
+ *
+ * @return the compatibility version
+ */
+ public Version version() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("value", value)
+ .add("version", version)
+ .toString();
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index fcb90f4..f2965d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -29,6 +29,7 @@
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.VersionService;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -80,6 +81,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
private StoragePartition partition;
private DistributedPrimitiveCreator primitiveCreator;
@@ -135,7 +139,7 @@
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultConsistentMapBuilder<>(primitiveCreator);
+ return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 2bc06dd..6d3d4dc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -15,10 +15,13 @@
*/
package org.onosproject.store.primitives.impl;
+import org.onosproject.core.Version;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
/**
* Default {@link AsyncConsistentMap} builder.
@@ -29,9 +32,11 @@
public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
private final DistributedPrimitiveCreator primitiveCreator;
+ private final Version version;
- public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
+ public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) {
this.primitiveCreator = primitiveCreator;
+ this.version = version;
}
@Override
@@ -41,7 +46,34 @@
@Override
public AsyncConsistentMap<K, V> buildAsyncMap() {
- AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+ AsyncConsistentMap<K, V> map;
+
+ // If a compatibility function is defined, we don't assume CompatibleValue and Version is registered in
+ // the user-provided serializer since it's an implementation detail. Instead, we use the user-provided
+ // serializer to convert the CompatibleValue value to a raw byte[] and use a separate serializer to encode
+ // the CompatibleValue to binary.
+ if (compatibilityFunction != null) {
+ Serializer serializer = serializer();
+
+ // Convert the byte[] value to CompatibleValue<byte[]>
+ AsyncConsistentMap<K, CompatibleValue<byte[]>> rawMap = primitiveCreator.newAsyncConsistentMap(
+ withSerializer(Serializer.using(KryoNamespaces.API, CompatibleValue.class)));
+
+ // Convert the CompatibleValue<byte[]> value to CompatibleValue<V> using the user-provided serializer.
+ AsyncConsistentMap<K, CompatibleValue<V>> compatibleMap =
+ DistributedPrimitives.newTranscodingMap(
+ rawMap,
+ key -> key,
+ key -> key,
+ value -> value == null ? null :
+ new CompatibleValue<byte[]>(serializer.encode(value.value()), value.version()),
+ value -> value == null ? null :
+ new CompatibleValue<V>(serializer.decode(value.value()), value.version()));
+ map = DistributedPrimitives.newCompatibleMap(compatibleMap, compatibilityFunction, version());
+ } else {
+ map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+ }
+
map = nullValues() ? map : DistributedPrimitives.newNotNullMap(map);
map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
index d116988..78c41e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.impl;
+import org.onosproject.core.Version;
import org.onosproject.store.service.AsyncAtomicCounterMap;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -22,6 +23,7 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
+import java.util.function.BiFunction;
import java.util.function.Function;
/**
@@ -91,6 +93,30 @@
}
/**
+ * Creates an instance of {@code AsyncConsistentMap} that converts values from other versions.
+ *
+ * @param map backing map
+ * @param compatibilityFunction the compatibility function
+ * @param version local node version
+ * @param <K> map key type
+ * @param <V> map value type
+ * @return compatible map
+ */
+ public static <K, V> AsyncConsistentMap<K, V> newCompatibleMap(
+ AsyncConsistentMap<K, CompatibleValue<V>> map,
+ BiFunction<V, Version, V> compatibilityFunction,
+ Version version) {
+ Function<V, CompatibleValue<V>> encoder = value -> new CompatibleValue<>(value, version);
+ Function<CompatibleValue<V>, V> decoder = value -> {
+ if (!value.version().equals(version)) {
+ return compatibilityFunction.apply(value.value(), value.version());
+ }
+ return value.value();
+ };
+ return new TranscodingAsyncConsistentMap<>(map, k -> k, k -> k, encoder, decoder);
+ }
+
+ /**
* Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
*
* @param map backing map
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 693cf4a..a649ecf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -35,6 +35,7 @@
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.VersionService;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -103,6 +104,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MembershipService membershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
private final Supplier<TransactionId> transactionIdGenerator =
() -> TransactionId.from(UUID.randomUUID().toString());
private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -173,7 +177,7 @@
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
+ return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator, versionService.version());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 0438fe5..5fa7383 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -158,8 +158,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -186,8 +189,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -213,8 +219,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -246,8 +255,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -271,8 +283,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -298,8 +313,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -317,8 +335,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MAX_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -335,8 +356,11 @@
.withMinTimeout(MIN_TIMEOUT)
.withMaxTimeout(MIN_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());
@@ -352,8 +376,11 @@
.withMinTimeout(Duration.ofMillis(options.electionTimeoutMillis()))
.withMaxTimeout(MIN_TIMEOUT)
.withMaxRetries(MAX_RETRIES)
- .withRevision(options.revision())
- .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+ .withRevision(options.version() != null && options.revisionType() != null
+ ? options.version().toInt() : 1)
+ .withPropagationStrategy(options.revisionType() != null
+ ? PropagationStrategy.valueOf(options.revisionType().name())
+ : PropagationStrategy.NONE)
.build()
.open()
.join());