Add compatibility functions to AtomicValue/Topic

Change-Id: I4a597cfa3effe0a62714ab12440a2fc41ac58aa9
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java
index b00a7af..b42e5f3 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.store.service;
 
+import java.util.function.BiFunction;
+
 import org.onosproject.store.primitives.DistributedPrimitiveOptions;
 
 /**
@@ -23,7 +25,22 @@
  * @param <V> atomic value type
  */
 public abstract class AtomicValueOptions<O extends AtomicValueOptions<O, V>, V> extends DistributedPrimitiveOptions<O> {
+    protected BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction;
+
     public AtomicValueOptions() {
         super(DistributedPrimitive.Type.VALUE);
     }
+
+    /**
+     * Sets a compatibility function on the map.
+     *
+     * @param compatibilityFunction the compatibility function
+     * @return the consistent map builder
+     */
+    @SuppressWarnings("unchecked")
+    public O withCompatibilityFunction(
+        BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction) {
+        this.compatibilityFunction = compatibilityFunction;
+        return (O) this;
+    }
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
index 7be0480..cc77393 100644
--- a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
@@ -114,6 +114,14 @@
     LeaderElectorBuilder leaderElectorBuilder();
 
     /**
+     * Creates a new TopicBuilder.
+     *
+     * @param <T> topic value type
+     * @return topic builder
+     */
+    <T> TopicBuilder<T> topicBuilder();
+
+    /**
      * Creates a new transaction context builder.
      *
      * @return a builder for a transaction context.
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index 2dd42fa..aa2e890 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -121,6 +121,14 @@
     LeaderElectorBuilder leaderElectorBuilder();
 
     /**
+     * Creates a new TopicBuilder.
+     *
+     * @param <T> topic value type
+     * @return topic builder
+     */
+    <T> TopicBuilder<T> topicBuilder();
+
+    /**
      * Creates a new transaction context builder.
      *
      * @return a builder for a transaction context.
diff --git a/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java b/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java
new file mode 100644
index 0000000..5943432
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
+
+/**
+ * Builder for {@link Topic} instances.
+ *
+ * @param <T> type for topic value
+ */
+public abstract class TopicBuilder<T>
+    extends TopicOptions<TopicBuilder<T>, T>
+    implements DistributedPrimitiveBuilder<Topic<T>> {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java b/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java
new file mode 100644
index 0000000..7845b56
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.function.BiFunction;
+
+import org.onosproject.store.primitives.DistributedPrimitiveOptions;
+
+/**
+ * Builder for {@link Topic} instances.
+ *
+ * @param <T> type for topic value
+ */
+public abstract class TopicOptions<O extends TopicOptions<O, T>, T>
+    extends DistributedPrimitiveOptions<O> {
+
+    protected BiFunction<T, org.onosproject.core.Version, T> compatibilityFunction;
+
+    public TopicOptions() {
+        super(DistributedPrimitive.Type.TOPIC);
+    }
+
+    /**
+     * Sets a compatibility function on the map.
+     *
+     * @param compatibilityFunction the compatibility function
+     * @return the consistent map builder
+     */
+    @SuppressWarnings("unchecked")
+    public O withCompatibilityFunction(
+        BiFunction<T, org.onosproject.core.Version, T> compatibilityFunction) {
+        this.compatibilityFunction = compatibilityFunction;
+        return (O) this;
+    }
+
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
index f05d9aa..68cb5fd 100644
--- a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
@@ -75,6 +75,11 @@
     }
 
     @Override
+    public <T> TopicBuilder<T> topicBuilder() {
+        return null;
+    }
+
+    @Override
     public TransactionContextBuilder transactionContextBuilder() {
         return null;
     }
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index 4d528e2..ae65500 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -70,6 +70,11 @@
     }
 
     @Override
+    public <T> TopicBuilder<T> topicBuilder() {
+        return null;
+    }
+
+    @Override
     public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
         return null;
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index e03b645..5976003 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -177,8 +177,13 @@
                 .withCompatibilityFunction(this::convertApplication)
                 .build();
 
-        appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
-                Serializer.using(KryoNamespaces.API));
+        appActivationTopic = storageService.<Application>topicBuilder()
+            .withName("onos-apps-activation-topic")
+            .withSerializer(Serializer.using(KryoNamespaces.API))
+            .withVersion(versionService.version())
+            .withRevisionType(RevisionType.PROPAGATE)
+            .withCompatibilityFunction(this::convertApplication)
+            .build();
 
         activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
                 "app-activation", log));
@@ -206,8 +211,8 @@
         // version, update the stored application with the new version.
         ApplicationDescription appDesc = getApplicationDescription(appHolder.app.id().name());
         if (!appDesc.version().equals(appHolder.app().version())) {
-            Application newApplication = DefaultApplication.builder(appHolder.app())
-                .withVersion(appDesc.version())
+            Application newApplication = DefaultApplication.builder(appDesc)
+                .withAppId(appHolder.app.id())
                 .build();
             return new InternalApplicationHolder(
                 newApplication, appHolder.state, appHolder.permissions);
@@ -216,6 +221,21 @@
     }
 
     /**
+     * Converts the versions of stored applications propagated from the prior version to the local application versions.
+     */
+    private Application convertApplication(Application app, Version version) {
+        // Load the application description from disk. If the version doesn't match the persisted
+        // version, update the stored application with the new version.
+        ApplicationDescription appDesc = getApplicationDescription(app.id().name());
+        if (!appDesc.version().equals(app.version())) {
+            return DefaultApplication.builder(appDesc)
+                .withAppId(app.id())
+                .build();
+        }
+        return app;
+    }
+
+    /**
      * Activates applications that should be activated according to the distributed store.
      */
     private void activateExistingApplications() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index f2965d9..1f6310f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -29,7 +29,6 @@
 import org.onosproject.cluster.DefaultPartition;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.VersionService;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -52,6 +51,7 @@
 import org.onosproject.store.service.LeaderElectorBuilder;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
 import org.onosproject.store.service.TransactionContextBuilder;
 import org.onosproject.store.service.WorkQueue;
 import org.slf4j.Logger;
@@ -81,9 +81,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PersistenceService persistenceService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected VersionService versionService;
-
     private StoragePartition partition;
     private DistributedPrimitiveCreator primitiveCreator;
 
@@ -139,7 +136,7 @@
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version());
+        return new DefaultConsistentMapBuilder<>(primitiveCreator);
     }
 
     @Override
@@ -205,6 +202,12 @@
     }
 
     @Override
+    public <T> TopicBuilder<T> topicBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+    }
+
+    @Override
     public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
         return primitiveCreator.newWorkQueue(name, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index fad3993..9058189 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -17,9 +17,11 @@
 
 import java.util.function.Supplier;
 
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -38,8 +40,24 @@
 
     @Override
     public AsyncAtomicValue<V> build() {
-        return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
-                                             checkNotNull(serializer()),
-                                             mapBuilder.buildAsyncMap());
+        if (compatibilityFunction != null) {
+            Serializer serializer = Serializer.using(KryoNamespaces.API, CompatibleValue.class);
+
+            AsyncAtomicValue<CompatibleValue<byte[]>> rawValue = new DefaultAsyncAtomicValue<>(
+                checkNotNull(name()), serializer, mapBuilder.buildAsyncMap());
+
+            AsyncAtomicValue<CompatibleValue<V>> compatibleValue =
+                DistributedPrimitives.newTranscodingAtomicValue(
+                    rawValue,
+                    value -> value == null ? null :
+                        new CompatibleValue<byte[]>(serializer().encode(value.value()), value.version()),
+                    value -> value == null ? null :
+                        new CompatibleValue<V>(serializer().decode(value.value()), value.version()));
+            return DistributedPrimitives.newCompatibleAtomicValue(compatibleValue, compatibilityFunction, version());
+        }
+        return new DefaultAsyncAtomicValue<>(
+            checkNotNull(name()),
+            checkNotNull(serializer()),
+            mapBuilder.buildAsyncMap());
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 6d3d4dc..d6b9441 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import org.onosproject.core.Version;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncConsistentMap;
@@ -32,11 +31,9 @@
 public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
 
     private final DistributedPrimitiveCreator primitiveCreator;
-    private final Version version;
 
-    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) {
+    public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
         this.primitiveCreator = primitiveCreator;
-        this.version = version;
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java
new file mode 100644
index 0000000..21ca7a1
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.function.BiFunction;
+
+import org.onosproject.core.Version;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.RevisionType;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
+
+/**
+ * Default topic builder.
+ */
+public class DefaultDistributedTopicBuilder<T> extends TopicBuilder<T> {
+    private final AtomicValueBuilder<T> valueBuilder;
+
+    public DefaultDistributedTopicBuilder(AtomicValueBuilder<T> valueBuilder) {
+        this.valueBuilder = valueBuilder;
+    }
+
+    @Override
+    public TopicBuilder<T> withName(String name) {
+        valueBuilder.withName(name);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withSerializer(Serializer serializer) {
+        valueBuilder.withSerializer(serializer);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withVersion(Version version) {
+        valueBuilder.withVersion(version);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withRevisionType(RevisionType revisionType) {
+        valueBuilder.withRevisionType(revisionType);
+        return this;
+    }
+
+    @Override
+    public TopicBuilder<T> withCompatibilityFunction(BiFunction<T, Version, T> compatibilityFunction) {
+        valueBuilder.withCompatibilityFunction(compatibilityFunction);
+        return this;
+    }
+
+    @Override
+    public Topic<T> build() {
+        return new DefaultDistributedTopic<>(valueBuilder.build());
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
index 78c41e8..9b209ba 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
@@ -15,17 +15,18 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
 import org.onosproject.core.Version;
 import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncDocumentTree;
 
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
 /**
  * Misc utilities for working with {@code DistributedPrimitive}s.
  */
@@ -117,6 +118,45 @@
     }
 
     /**
+     * Creates an instance of {@code AsyncAtomicValue} that transforms value types.
+     *
+     * @param value backing value
+     * @param valueEncoder transformer for value type of returned value to value type of input value
+     * @param valueDecoder transformer for value type of input value to value type of returned value
+     * @param <V1> returned value type
+     * @param <V2> input value type
+     * @return new counter map
+     */
+    public static <V1, V2> AsyncAtomicValue<V1> newTranscodingAtomicValue(AsyncAtomicValue<V2> value,
+        Function<V1, V2> valueEncoder,
+        Function<V2, V1> valueDecoder) {
+        return new TranscodingAsyncAtomicValue<>(value, valueEncoder, valueDecoder);
+    }
+
+    /**
+     * Creates an instance of {@code AsyncAtomicValue} that converts values from other versions.
+     *
+     * @param atomicValue backing value
+     * @param compatibilityFunction the compatibility function
+     * @param version local node version
+     * @param <V> value type
+     * @return compatible map
+     */
+    public static <V> AsyncAtomicValue<V> newCompatibleAtomicValue(
+        AsyncAtomicValue<CompatibleValue<V>> atomicValue,
+        BiFunction<V, Version, V> compatibilityFunction,
+        Version version) {
+        Function<V, CompatibleValue<V>> encoder = value -> new CompatibleValue<>(value, version);
+        Function<CompatibleValue<V>, V> decoder = value -> {
+            if (!value.version().equals(version)) {
+                return compatibilityFunction.apply(value.value(), value.version());
+            }
+            return value.value();
+        };
+        return new TranscodingAsyncAtomicValue<>(atomicValue, encoder, decoder);
+    }
+
+    /**
      * Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
      *
      * @param map backing map
@@ -127,8 +167,8 @@
      * @return new counter map
      */
     public static <K1, K2> AsyncAtomicCounterMap<K1> newTranscodingAtomicCounterMap(AsyncAtomicCounterMap<K2> map,
-            Function<K1, K2> keyEncoder,
-            Function<K2, K1> keyDecoder) {
+        Function<K1, K2> keyEncoder,
+        Function<K2, K1> keyDecoder) {
         return new TranscodingAsyncAtomicCounterMap<>(map, keyEncoder, keyDecoder);
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index a649ecf..719a96e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -35,7 +35,6 @@
 import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.VersionService;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -66,6 +65,7 @@
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
 import org.onosproject.store.service.TransactionContextBuilder;
 import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.WorkQueueStats;
@@ -104,9 +104,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MembershipService membershipService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected VersionService versionService;
-
     private final Supplier<TransactionId> transactionIdGenerator =
             () -> TransactionId.from(UUID.randomUUID().toString());
     private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -177,7 +174,7 @@
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator, versionService.version());
+        return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
     }
 
     @Override
@@ -252,6 +249,12 @@
     }
 
     @Override
+    public <T> TopicBuilder<T> topicBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+    }
+
+    @Override
     public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
         return federatedPrimitiveCreator.newWorkQueue(name, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java
new file mode 100644
index 0000000..24dd35d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+
+/**
+ * An {@code AsyncAtomicValue} that transcodes values.
+ */
+public class TranscodingAsyncAtomicValue<V1, V2> implements AsyncAtomicValue<V1> {
+    private final AsyncAtomicValue<V2> backingValue;
+    private final Function<V1, V2> valueEncoder;
+    private final Function<V2, V1> valueDecoder;
+    private final Map<AtomicValueEventListener<V1>, InternalValueEventListener> listeners = Maps.newIdentityHashMap();
+
+    public TranscodingAsyncAtomicValue(
+        AsyncAtomicValue<V2> backingValue, Function<V1, V2> valueEncoder, Function<V2, V1> valueDecoder) {
+        this.backingValue = backingValue;
+        this.valueEncoder = k -> k == null ? null : valueEncoder.apply(k);
+        this.valueDecoder = k -> k == null ? null : valueDecoder.apply(k);
+    }
+
+    @Override
+    public String name() {
+        return backingValue.name();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(V1 expect, V1 update) {
+        return backingValue.compareAndSet(valueEncoder.apply(expect), valueEncoder.apply(update));
+    }
+
+    @Override
+    public CompletableFuture<V1> get() {
+        return backingValue.get().thenApply(valueDecoder);
+    }
+
+    @Override
+    public CompletableFuture<V1> getAndSet(V1 value) {
+        return backingValue.getAndSet(valueEncoder.apply(value)).thenApply(valueDecoder);
+    }
+
+    @Override
+    public CompletableFuture<Void> set(V1 value) {
+        return backingValue.set(valueEncoder.apply(value));
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(AtomicValueEventListener<V1> listener) {
+        synchronized (listeners) {
+            InternalValueEventListener backingMapListener =
+                listeners.computeIfAbsent(listener, k -> new InternalValueEventListener(listener));
+            return backingValue.addListener(backingMapListener);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(AtomicValueEventListener<V1> listener) {
+        synchronized (listeners) {
+            InternalValueEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingValue.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+    }
+
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        backingValue.addStatusChangeListener(listener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        backingValue.removeStatusChangeListener(listener);
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return backingValue.statusChangeListeners();
+    }
+
+    private class InternalValueEventListener implements AtomicValueEventListener<V2> {
+        private final AtomicValueEventListener<V1> listener;
+
+        InternalValueEventListener(AtomicValueEventListener<V1> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(AtomicValueEvent<V2> event) {
+            listener.event(new AtomicValueEvent<>(
+                event.name(),
+                event.newValue() != null ? valueDecoder.apply(event.newValue()) : null,
+                event.oldValue() != null ? valueDecoder.apply(event.oldValue()) : null));
+        }
+    }
+
+}
diff --git a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
index 4ba8a2c..fed4cab 100644
--- a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
+++ b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
@@ -33,6 +33,7 @@
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
 import org.onosproject.store.service.TransactionContextBuilder;
 import org.onosproject.store.service.WorkQueue;
 
@@ -91,6 +92,11 @@
     }
 
     @Override
+    public <T> TopicBuilder<T> topicBuilder() {
+        return null;
+    }
+
+    @Override
     public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
         return null;
     }