[ONOS-7547] Implement support for backwards/forward compatibility of ConsistentMap values on read

Change-Id: Ifffb6a883ec4ee6aa5587da58a51c1e90694e5ea
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java
index c576375..1691086 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveOptions.java
@@ -38,8 +38,8 @@
     private boolean meteringDisabled = false;
     private boolean readOnly = false;
     private boolean relaxedReadConsistency = false;
-    private int revisionNumber = 1;
-    private RevisionType revisionType = RevisionType.NONE;
+    private Version version;
+    private RevisionType revisionType;
 
     public DistributedPrimitiveOptions(DistributedPrimitive.Type type) {
         this.type = type;
@@ -85,17 +85,7 @@
      * @return this builder
      */
     public O withVersion(Version version) {
-        return withRevisionNumber(version.toInt());
-    }
-
-    /**
-     * Sets the primitive revision.
-     *
-     * @param revision the primitive revision
-     * @return this builder
-     */
-    public O withRevisionNumber(int revision) {
-        this.revisionNumber = revision;
+        this.version = version;
         return (O) this;
     }
 
@@ -212,12 +202,12 @@
     }
 
     /**
-     * Returns the primitive revision number.
+     * Returns the primitive version.
      *
-     * @return the primitive revision number
+     * @return the primitive version
      */
-    public final int revision() {
-        return revisionNumber;
+    public final Version version() {
+        return version;
     }
 
     /**
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java
index 587789d..4dacab8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapOptions.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.store.service;
 
+import java.util.function.BiFunction;
+
 import org.onosproject.store.primitives.DistributedPrimitiveOptions;
 
 /**
@@ -28,6 +30,7 @@
 
     private boolean nullValues = false;
     private boolean purgeOnUninstall = false;
+    protected BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction;
 
     public ConsistentMapOptions() {
         super(DistributedPrimitive.Type.CONSISTENT_MAP);
@@ -54,6 +57,19 @@
     }
 
     /**
+     * Sets a compatibility function on the map.
+     *
+     * @param compatibilityFunction the compatibility function
+     * @return the consistent map builder
+     */
+    @SuppressWarnings("unchecked")
+    public O withCompatibilityFunction(
+        BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction) {
+        this.compatibilityFunction = compatibilityFunction;
+        return (O) this;
+    }
+
+    /**
      * Returns whether null values are supported by the map.
      *
      * @return {@code true} if null values are supported; {@code false} otherwise
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CompatibleValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CompatibleValue.java
new file mode 100644
index 0000000..73c4d12
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CompatibleValue.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.core.Version;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Compatibility wrapper for primitive values.
+ */
+public class CompatibleValue<T> {
+    private final T value;
+    private final Version version;
+
+    public CompatibleValue(T value, Version version) {
+        this.value = value;
+        this.version = version;
+    }
+
+    /**
+     * Returns the wrapped value.
+     *
+     * @return the wrapped value
+     */
+    public T value() {
+        return value;
+    }
+
+    /**
+     * Returns the compatibilty version.
+     *
+     * @return the compatibility version
+     */
+    public Version version() {
+        return version;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+            .add("value", value)
+            .add("version", version)
+            .toString();
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index fcb90f4..f2965d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -29,6 +29,7 @@
 import org.onosproject.cluster.DefaultPartition;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.VersionService;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -80,6 +81,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PersistenceService persistenceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected VersionService versionService;
+
     private StoragePartition partition;
     private DistributedPrimitiveCreator primitiveCreator;
 
@@ -135,7 +139,7 @@
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMapBuilder<>(primitiveCreator);
+        return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 2bc06dd..6d3d4dc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -15,10 +15,13 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import org.onosproject.core.Version;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
 
 /**
  * Default {@link AsyncConsistentMap} builder.
@@ -29,9 +32,11 @@
 public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
 
     private final DistributedPrimitiveCreator primitiveCreator;
+    private final Version version;
 
-    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
+    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) {
         this.primitiveCreator = primitiveCreator;
+        this.version = version;
     }
 
     @Override
@@ -41,7 +46,34 @@
 
     @Override
     public AsyncConsistentMap<K, V> buildAsyncMap() {
-        AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+        AsyncConsistentMap<K, V> map;
+
+        // If a compatibility function is defined, we don't assume CompatibleValue and Version is registered in
+        // the user-provided serializer since it's an implementation detail. Instead, we use the user-provided
+        // serializer to convert the CompatibleValue value to a raw byte[] and use a separate serializer to encode
+        // the CompatibleValue to binary.
+        if (compatibilityFunction != null) {
+            Serializer serializer = serializer();
+
+            // Convert the byte[] value to CompatibleValue<byte[]>
+            AsyncConsistentMap<K, CompatibleValue<byte[]>> rawMap = primitiveCreator.newAsyncConsistentMap(
+                withSerializer(Serializer.using(KryoNamespaces.API, CompatibleValue.class)));
+
+            // Convert the CompatibleValue<byte[]> value to CompatibleValue<V> using the user-provided serializer.
+            AsyncConsistentMap<K, CompatibleValue<V>> compatibleMap =
+                DistributedPrimitives.newTranscodingMap(
+                    rawMap,
+                    key -> key,
+                    key -> key,
+                    value -> value == null ? null :
+                        new CompatibleValue<byte[]>(serializer.encode(value.value()), value.version()),
+                    value -> value == null ? null :
+                        new CompatibleValue<V>(serializer.decode(value.value()), value.version()));
+            map = DistributedPrimitives.newCompatibleMap(compatibleMap, compatibilityFunction, version());
+        } else {
+            map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+        }
+
         map = nullValues() ? map : DistributedPrimitives.newNotNullMap(map);
         map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
         map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
index d116988..78c41e8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import org.onosproject.core.Version;
 import org.onosproject.store.service.AsyncAtomicCounterMap;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentMultimap;
@@ -22,6 +23,7 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncDocumentTree;
 
+import java.util.function.BiFunction;
 import java.util.function.Function;
 
 /**
@@ -91,6 +93,30 @@
     }
 
     /**
+     * Creates an instance of {@code AsyncConsistentMap} that converts values from other versions.
+     *
+     * @param map backing map
+     * @param compatibilityFunction the compatibility function
+     * @param version local node version
+     * @param <K> map key type
+     * @param <V> map value type
+     * @return compatible map
+     */
+    public static <K, V> AsyncConsistentMap<K, V> newCompatibleMap(
+        AsyncConsistentMap<K, CompatibleValue<V>> map,
+        BiFunction<V, Version, V> compatibilityFunction,
+        Version version) {
+        Function<V, CompatibleValue<V>> encoder = value -> new CompatibleValue<>(value, version);
+        Function<CompatibleValue<V>, V> decoder = value -> {
+            if (!value.version().equals(version)) {
+                return compatibilityFunction.apply(value.value(), value.version());
+            }
+            return value.value();
+        };
+        return new TranscodingAsyncConsistentMap<>(map, k -> k, k -> k, encoder, decoder);
+    }
+
+    /**
      * Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
      *
      * @param map backing map
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 693cf4a..a649ecf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -35,6 +35,7 @@
 import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.VersionService;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -103,6 +104,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MembershipService membershipService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected VersionService versionService;
+
     private final Supplier<TransactionId> transactionIdGenerator =
             () -> TransactionId.from(UUID.randomUUID().toString());
     private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -173,7 +177,7 @@
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
+        return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator, versionService.version());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 0438fe5..5fa7383 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -158,8 +158,11 @@
                         .withMinTimeout(MIN_TIMEOUT)
                         .withMaxTimeout(MAX_TIMEOUT)
                         .withMaxRetries(MAX_RETRIES)
-                        .withRevision(options.revision())
-                        .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                        .withRevision(options.version() != null && options.revisionType() != null
+                            ? options.version().toInt() : 1)
+                        .withPropagationStrategy(options.revisionType() != null
+                            ? PropagationStrategy.valueOf(options.revisionType().name())
+                            : PropagationStrategy.NONE)
                         .build()
                         .open()
                         .join());
@@ -186,8 +189,11 @@
                         .withMinTimeout(MIN_TIMEOUT)
                         .withMaxTimeout(MAX_TIMEOUT)
                         .withMaxRetries(MAX_RETRIES)
-                        .withRevision(options.revision())
-                        .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                        .withRevision(options.version() != null && options.revisionType() != null
+                            ? options.version().toInt() : 1)
+                        .withPropagationStrategy(options.revisionType() != null
+                            ? PropagationStrategy.valueOf(options.revisionType().name())
+                            : PropagationStrategy.NONE)
                         .build()
                         .open()
                         .join());
@@ -213,8 +219,11 @@
                         .withMinTimeout(MIN_TIMEOUT)
                         .withMaxTimeout(MAX_TIMEOUT)
                         .withMaxRetries(MAX_RETRIES)
-                        .withRevision(options.revision())
-                        .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                        .withRevision(options.version() != null && options.revisionType() != null
+                            ? options.version().toInt() : 1)
+                        .withPropagationStrategy(options.revisionType() != null
+                            ? PropagationStrategy.valueOf(options.revisionType().name())
+                            : PropagationStrategy.NONE)
                         .build()
                         .open()
                         .join());
@@ -246,8 +255,11 @@
                 .withMinTimeout(MIN_TIMEOUT)
                 .withMaxTimeout(MAX_TIMEOUT)
                 .withMaxRetries(MAX_RETRIES)
-                .withRevision(options.revision())
-                .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                .withRevision(options.version() != null && options.revisionType() != null
+                    ? options.version().toInt() : 1)
+                .withPropagationStrategy(options.revisionType() != null
+                    ? PropagationStrategy.valueOf(options.revisionType().name())
+                    : PropagationStrategy.NONE)
                 .build()
                 .open()
                 .join());
@@ -271,8 +283,11 @@
                 .withMinTimeout(MIN_TIMEOUT)
                 .withMaxTimeout(MAX_TIMEOUT)
                 .withMaxRetries(MAX_RETRIES)
-                .withRevision(options.revision())
-                .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                .withRevision(options.version() != null && options.revisionType() != null
+                    ? options.version().toInt() : 1)
+                .withPropagationStrategy(options.revisionType() != null
+                    ? PropagationStrategy.valueOf(options.revisionType().name())
+                    : PropagationStrategy.NONE)
                 .build()
                 .open()
                 .join());
@@ -298,8 +313,11 @@
                 .withMinTimeout(MIN_TIMEOUT)
                 .withMaxTimeout(MAX_TIMEOUT)
                 .withMaxRetries(MAX_RETRIES)
-                .withRevision(options.revision())
-                .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                .withRevision(options.version() != null && options.revisionType() != null
+                    ? options.version().toInt() : 1)
+                .withPropagationStrategy(options.revisionType() != null
+                    ? PropagationStrategy.valueOf(options.revisionType().name())
+                    : PropagationStrategy.NONE)
                 .build()
                 .open()
                 .join());
@@ -317,8 +335,11 @@
                 .withMinTimeout(MIN_TIMEOUT)
                 .withMaxTimeout(MAX_TIMEOUT)
                 .withMaxRetries(MAX_RETRIES)
-                .withRevision(options.revision())
-                .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                .withRevision(options.version() != null && options.revisionType() != null
+                    ? options.version().toInt() : 1)
+                .withPropagationStrategy(options.revisionType() != null
+                    ? PropagationStrategy.valueOf(options.revisionType().name())
+                    : PropagationStrategy.NONE)
                 .build()
                 .open()
                 .join());
@@ -335,8 +356,11 @@
                 .withMinTimeout(MIN_TIMEOUT)
                 .withMaxTimeout(MIN_TIMEOUT)
                 .withMaxRetries(MAX_RETRIES)
-                .withRevision(options.revision())
-                .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                .withRevision(options.version() != null && options.revisionType() != null
+                    ? options.version().toInt() : 1)
+                .withPropagationStrategy(options.revisionType() != null
+                    ? PropagationStrategy.valueOf(options.revisionType().name())
+                    : PropagationStrategy.NONE)
                 .build()
                 .open()
                 .join());
@@ -352,8 +376,11 @@
                 .withMinTimeout(Duration.ofMillis(options.electionTimeoutMillis()))
                 .withMaxTimeout(MIN_TIMEOUT)
                 .withMaxRetries(MAX_RETRIES)
-                .withRevision(options.revision())
-                .withPropagationStrategy(PropagationStrategy.valueOf(options.revisionType().name()))
+                .withRevision(options.version() != null && options.revisionType() != null
+                    ? options.version().toInt() : 1)
+                .withPropagationStrategy(options.revisionType() != null
+                    ? PropagationStrategy.valueOf(options.revisionType().name())
+                    : PropagationStrategy.NONE)
                 .build()
                 .open()
                 .join());