Add compatibility functions to AtomicValue/Topic

Change-Id: I4a597cfa3effe0a62714ab12440a2fc41ac58aa9
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index f2965d9..1f6310f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -29,7 +29,6 @@
 import org.onosproject.cluster.DefaultPartition;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.VersionService;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -52,6 +51,7 @@
 import org.onosproject.store.service.LeaderElectorBuilder;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
 import org.onosproject.store.service.TransactionContextBuilder;
 import org.onosproject.store.service.WorkQueue;
 import org.slf4j.Logger;
@@ -81,9 +81,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PersistenceService persistenceService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected VersionService versionService;
-
     private StoragePartition partition;
     private DistributedPrimitiveCreator primitiveCreator;
 
@@ -139,7 +136,7 @@
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version());
+        return new DefaultConsistentMapBuilder<>(primitiveCreator);
     }
 
     @Override
@@ -205,6 +202,12 @@
     }
 
     @Override
+    public <T> TopicBuilder<T> topicBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+    }
+
+    @Override
     public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
         return primitiveCreator.newWorkQueue(name, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index fad3993..9058189 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -17,9 +17,11 @@
 
 import java.util.function.Supplier;
 
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -38,8 +40,24 @@
 
     @Override
     public AsyncAtomicValue<V> build() {
-        return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
-                                             checkNotNull(serializer()),
-                                             mapBuilder.buildAsyncMap());
+        if (compatibilityFunction != null) {
+            Serializer serializer = Serializer.using(KryoNamespaces.API, CompatibleValue.class);
+
+            AsyncAtomicValue<CompatibleValue<byte[]>> rawValue = new DefaultAsyncAtomicValue<>(
+                checkNotNull(name()), serializer, mapBuilder.buildAsyncMap());
+
+            AsyncAtomicValue<CompatibleValue<V>> compatibleValue =
+                DistributedPrimitives.newTranscodingAtomicValue(
+                    rawValue,
+                    value -> value == null ? null :
+                        new CompatibleValue<byte[]>(serializer().encode(value.value()), value.version()),
+                    value -> value == null ? null :
+                        new CompatibleValue<V>(serializer().decode(value.value()), value.version()));
+            return DistributedPrimitives.newCompatibleAtomicValue(compatibleValue, compatibilityFunction, version());
+        }
+        return new DefaultAsyncAtomicValue<>(
+            checkNotNull(name()),
+            checkNotNull(serializer()),
+            mapBuilder.buildAsyncMap());
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 6d3d4dc..d6b9441 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import org.onosproject.core.Version;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
@@ -32,11 +31,9 @@
 public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
 
     private final DistributedPrimitiveCreator primitiveCreator;
-    private final Version version;
 
-    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) {
+    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
         this.primitiveCreator = primitiveCreator;
-        this.version = version;
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java
new file mode 100644
index 0000000..21ca7a1
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.function.BiFunction;
+
+import org.onosproject.core.Version;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.RevisionType;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
+
+/**
+ * Default topic builder.
+ */
+public class DefaultDistributedTopicBuilder<T> extends TopicBuilder<T> {
+    private final AtomicValueBuilder<T> valueBuilder;
+
+    public DefaultDistributedTopicBuilder(AtomicValueBuilder<T> valueBuilder) {
+        this.valueBuilder = valueBuilder;
+    }
+
+    @Override
+    public TopicBuilder<T> withName(String name) {
+        valueBuilder.withName(name);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withSerializer(Serializer serializer) {
+        valueBuilder.withSerializer(serializer);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withVersion(Version version) {
+        valueBuilder.withVersion(version);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withRevisionType(RevisionType revisionType) {
+        valueBuilder.withRevisionType(revisionType);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withCompatibilityFunction(BiFunction<T, Version, T> compatibilityFunction) {
+        valueBuilder.withCompatibilityFunction(compatibilityFunction);
+        return this;
+    }
+
+    @Override
+    public Topic<T> build() {
+        return new DefaultDistributedTopic<>(valueBuilder.build());
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
index 78c41e8..9b209ba 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
@@ -15,17 +15,18 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
 import org.onosproject.core.Version;
 import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncDocumentTree;
 
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
 /**
  * Misc utilities for working with {@code DistributedPrimitive}s.
  */
@@ -117,6 +118,45 @@
     }
 
     /**
+     * Creates an instance of {@code AsyncAtomicValue} that transforms value types.
+     *
+     * @param value backing value
+     * @param valueEncoder transformer for value type of returned value to value type of input value
+     * @param valueDecoder transformer for value type of input value to value type of returned value
+     * @param <V1> returned value type
+     * @param <V2> input value type
+     * @return new counter map
+     */
+    public static <V1, V2> AsyncAtomicValue<V1> newTranscodingAtomicValue(AsyncAtomicValue<V2> value,
+        Function<V1, V2> valueEncoder,
+        Function<V2, V1> valueDecoder) {
+        return new TranscodingAsyncAtomicValue<>(value, valueEncoder, valueDecoder);
+    }
+
+    /**
+     * Creates an instance of {@code AsyncAtomicValue} that converts values from other versions.
+     *
+     * @param atomicValue backing value
+     * @param compatibilityFunction the compatibility function
+     * @param version local node version
+     * @param <V> value type
+     * @return compatible map
+     */
+    public static <V> AsyncAtomicValue<V> newCompatibleAtomicValue(
+        AsyncAtomicValue<CompatibleValue<V>> atomicValue,
+        BiFunction<V, Version, V> compatibilityFunction,
+        Version version) {
+        Function<V, CompatibleValue<V>> encoder = value -> new CompatibleValue<>(value, version);
+        Function<CompatibleValue<V>, V> decoder = value -> {
+            if (!value.version().equals(version)) {
+                return compatibilityFunction.apply(value.value(), value.version());
+            }
+            return value.value();
+        };
+        return new TranscodingAsyncAtomicValue<>(atomicValue, encoder, decoder);
+    }
+
+    /**
      * Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
      *
      * @param map backing map
@@ -127,8 +167,8 @@
      * @return new counter map
      */
     public static <K1, K2> AsyncAtomicCounterMap<K1> newTranscodingAtomicCounterMap(AsyncAtomicCounterMap<K2> map,
-            Function<K1, K2> keyEncoder,
-            Function<K2, K1> keyDecoder) {
+        Function<K1, K2> keyEncoder,
+        Function<K2, K1> keyDecoder) {
         return new TranscodingAsyncAtomicCounterMap<>(map, keyEncoder, keyDecoder);
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index a649ecf..719a96e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -35,7 +35,6 @@
 import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.VersionService;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -66,6 +65,7 @@
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
 import org.onosproject.store.service.TransactionContextBuilder;
 import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.WorkQueueStats;
@@ -104,9 +104,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MembershipService membershipService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected VersionService versionService;
-
     private final Supplier<TransactionId> transactionIdGenerator =
             () -> TransactionId.from(UUID.randomUUID().toString());
     private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -177,7 +174,7 @@
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator, versionService.version());
+        return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
     }
 
     @Override
@@ -252,6 +249,12 @@
     }
 
     @Override
+    public <T> TopicBuilder<T> topicBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+    }
+
+    @Override
     public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
         return federatedPrimitiveCreator.newWorkQueue(name, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java
new file mode 100644
index 0000000..24dd35d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+
+/**
+ * An {@code AsyncAtomicValue} that transcodes values.
+ */
+public class TranscodingAsyncAtomicValue<V1, V2> implements AsyncAtomicValue<V1> {
+    private final AsyncAtomicValue<V2> backingValue;
+    private final Function<V1, V2> valueEncoder;
+    private final Function<V2, V1> valueDecoder;
+    private final Map<AtomicValueEventListener<V1>, InternalValueEventListener> listeners = Maps.newIdentityHashMap();
+
+    public TranscodingAsyncAtomicValue(
+        AsyncAtomicValue<V2> backingValue, Function<V1, V2> valueEncoder, Function<V2, V1> valueDecoder) {
+        this.backingValue = backingValue;
+        this.valueEncoder = k -> k == null ? null : valueEncoder.apply(k);
+        this.valueDecoder = k -> k == null ? null : valueDecoder.apply(k);
+    }
+
+    @Override
+    public String name() {
+        return backingValue.name();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(V1 expect, V1 update) {
+        return backingValue.compareAndSet(valueEncoder.apply(expect), valueEncoder.apply(update));
+    }
+
+    @Override
+    public CompletableFuture<V1> get() {
+        return backingValue.get().thenApply(valueDecoder);
+    }
+
+    @Override
+    public CompletableFuture<V1> getAndSet(V1 value) {
+        return backingValue.getAndSet(valueEncoder.apply(value)).thenApply(valueDecoder);
+    }
+
+    @Override
+    public CompletableFuture<Void> set(V1 value) {
+        return backingValue.set(valueEncoder.apply(value));
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(AtomicValueEventListener<V1> listener) {
+        synchronized (listeners) {
+            InternalValueEventListener backingMapListener =
+                listeners.computeIfAbsent(listener, k -> new InternalValueEventListener(listener));
+            return backingValue.addListener(backingMapListener);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(AtomicValueEventListener<V1> listener) {
+        synchronized (listeners) {
+            InternalValueEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingValue.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+    }
+
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        backingValue.addStatusChangeListener(listener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        backingValue.removeStatusChangeListener(listener);
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return backingValue.statusChangeListeners();
+    }
+
+    private class InternalValueEventListener implements AtomicValueEventListener<V2> {
+        private final AtomicValueEventListener<V1> listener;
+
+        InternalValueEventListener(AtomicValueEventListener<V1> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(AtomicValueEvent<V2> event) {
+            listener.event(new AtomicValueEvent<>(
+                event.name(),
+                event.newValue() != null ? valueDecoder.apply(event.newValue()) : null,
+                event.oldValue() != null ? valueDecoder.apply(event.oldValue()) : null));
+        }
+    }
+
+}