[ONOS-7547] Implement support for backwards/forward compatibility of ConsistentMap values on read

Change-Id: Ifffb6a883ec4ee6aa5587da58a51c1e90694e5ea
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 2bc06dd..6d3d4dc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -15,10 +15,13 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import org.onosproject.core.Version;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
 
 /**
  * Default {@link AsyncConsistentMap} builder.
@@ -29,9 +32,11 @@
 public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
 
     private final DistributedPrimitiveCreator primitiveCreator;
+    private final Version version;
 
-    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
+    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) {
         this.primitiveCreator = primitiveCreator;
+        this.version = version;
     }
 
     @Override
@@ -41,7 +46,34 @@
 
     @Override
     public AsyncConsistentMap<K, V> buildAsyncMap() {
-        AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+        AsyncConsistentMap<K, V> map;
+
+        // If a compatibility function is defined, we don't assume CompatibleValue and Version is registered in
+        // the user-provided serializer since it's an implementation detail. Instead, we use the user-provided
+        // serializer to convert the CompatibleValue value to a raw byte[] and use a separate serializer to encode
+        // the CompatibleValue to binary.
+        if (compatibilityFunction != null) {
+            Serializer serializer = serializer();
+
+            // Convert the byte[] value to CompatibleValue<byte[]>
+            AsyncConsistentMap<K, CompatibleValue<byte[]>> rawMap = primitiveCreator.newAsyncConsistentMap(
+                withSerializer(Serializer.using(KryoNamespaces.API, CompatibleValue.class)));
+
+            // Convert the CompatibleValue<byte[]> value to CompatibleValue<V> using the user-provided serializer.
+            AsyncConsistentMap<K, CompatibleValue<V>> compatibleMap =
+                DistributedPrimitives.newTranscodingMap(
+                    rawMap,
+                    key -> key,
+                    key -> key,
+                    value -> value == null ? null :
+                        new CompatibleValue<byte[]>(serializer.encode(value.value()), value.version()),
+                    value -> value == null ? null :
+                        new CompatibleValue<V>(serializer.decode(value.value()), value.version()));
+            map = DistributedPrimitives.newCompatibleMap(compatibleMap, compatibilityFunction, version());
+        } else {
+            map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+        }
+
         map = nullValues() ? map : DistributedPrimitives.newNotNullMap(map);
         map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
         map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;