Add compatibility functions to AtomicValue/Topic
Change-Id: I4a597cfa3effe0a62714ab12440a2fc41ac58aa9
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java
index b00a7af..b42e5f3 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.store.service;
+import java.util.function.BiFunction;
+
import org.onosproject.store.primitives.DistributedPrimitiveOptions;
/**
@@ -23,7 +25,22 @@
* @param <V> atomic value type
*/
public abstract class AtomicValueOptions<O extends AtomicValueOptions<O, V>, V> extends DistributedPrimitiveOptions<O> {
+ protected BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction;
+
public AtomicValueOptions() {
super(DistributedPrimitive.Type.VALUE);
}
+
+ /**
+ * Sets a compatibility function on the map.
+ *
+ * @param compatibilityFunction the compatibility function
+ * @return the consistent map builder
+ */
+ @SuppressWarnings("unchecked")
+ public O withCompatibilityFunction(
+ BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction) {
+ this.compatibilityFunction = compatibilityFunction;
+ return (O) this;
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
index 7be0480..cc77393 100644
--- a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
@@ -114,6 +114,14 @@
LeaderElectorBuilder leaderElectorBuilder();
/**
+ * Creates a new TopicBuilder.
+ *
+ * @param <T> topic value type
+ * @return topic builder
+ */
+ <T> TopicBuilder<T> topicBuilder();
+
+ /**
* Creates a new transaction context builder.
*
* @return a builder for a transaction context.
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index 2dd42fa..aa2e890 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -121,6 +121,14 @@
LeaderElectorBuilder leaderElectorBuilder();
/**
+ * Creates a new TopicBuilder.
+ *
+ * @param <T> topic value type
+ * @return topic builder
+ */
+ <T> TopicBuilder<T> topicBuilder();
+
+ /**
* Creates a new transaction context builder.
*
* @return a builder for a transaction context.
diff --git a/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java b/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java
new file mode 100644
index 0000000..5943432
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
+
+/**
+ * Builder for {@link Topic} instances.
+ *
+ * @param <T> type for topic value
+ */
+public abstract class TopicBuilder<T>
+ extends TopicOptions<TopicBuilder<T>, T>
+ implements DistributedPrimitiveBuilder<Topic<T>> {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java b/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java
new file mode 100644
index 0000000..7845b56
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.function.BiFunction;
+
+import org.onosproject.store.primitives.DistributedPrimitiveOptions;
+
+/**
+ * Builder for {@link Topic} instances.
+ *
+ * @param <T> type for topic value
+ */
+public abstract class TopicOptions<O extends TopicOptions<O, T>, T>
+ extends DistributedPrimitiveOptions<O> {
+
+ protected BiFunction<T, org.onosproject.core.Version, T> compatibilityFunction;
+
+ public TopicOptions() {
+ super(DistributedPrimitive.Type.TOPIC);
+ }
+
+ /**
+ * Sets a compatibility function on the map.
+ *
+ * @param compatibilityFunction the compatibility function
+ * @return the consistent map builder
+ */
+ @SuppressWarnings("unchecked")
+ public O withCompatibilityFunction(
+ BiFunction<T, org.onosproject.core.Version, T> compatibilityFunction) {
+ this.compatibilityFunction = compatibilityFunction;
+ return (O) this;
+ }
+
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
index f05d9aa..68cb5fd 100644
--- a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
@@ -75,6 +75,11 @@
}
@Override
+ public <T> TopicBuilder<T> topicBuilder() {
+ return null;
+ }
+
+ @Override
public TransactionContextBuilder transactionContextBuilder() {
return null;
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index 4d528e2..ae65500 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -70,6 +70,11 @@
}
@Override
+ public <T> TopicBuilder<T> topicBuilder() {
+ return null;
+ }
+
+ @Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
return null;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index e03b645..5976003 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -177,8 +177,13 @@
.withCompatibilityFunction(this::convertApplication)
.build();
- appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
- Serializer.using(KryoNamespaces.API));
+ appActivationTopic = storageService.<Application>topicBuilder()
+ .withName("onos-apps-activation-topic")
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .withVersion(versionService.version())
+ .withRevisionType(RevisionType.PROPAGATE)
+ .withCompatibilityFunction(this::convertApplication)
+ .build();
activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
"app-activation", log));
@@ -206,8 +211,8 @@
// version, update the stored application with the new version.
ApplicationDescription appDesc = getApplicationDescription(appHolder.app.id().name());
if (!appDesc.version().equals(appHolder.app().version())) {
- Application newApplication = DefaultApplication.builder(appHolder.app())
- .withVersion(appDesc.version())
+ Application newApplication = DefaultApplication.builder(appDesc)
+ .withAppId(appHolder.app.id())
.build();
return new InternalApplicationHolder(
newApplication, appHolder.state, appHolder.permissions);
@@ -216,6 +221,21 @@
}
/**
+ * Converts the versions of stored applications propagated from the prior version to the local application versions.
+ */
+ private Application convertApplication(Application app, Version version) {
+ // Load the application description from disk. If the version doesn't match the persisted
+ // version, update the stored application with the new version.
+ ApplicationDescription appDesc = getApplicationDescription(app.id().name());
+ if (!appDesc.version().equals(app.version())) {
+ return DefaultApplication.builder(appDesc)
+ .withAppId(app.id())
+ .build();
+ }
+ return app;
+ }
+
+ /**
* Activates applications that should be activated according to the distributed store.
*/
private void activateExistingApplications() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index f2965d9..1f6310f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -29,7 +29,6 @@
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.VersionService;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -52,6 +51,7 @@
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
@@ -81,9 +81,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VersionService versionService;
-
private StoragePartition partition;
private DistributedPrimitiveCreator primitiveCreator;
@@ -139,7 +136,7 @@
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version());
+ return new DefaultConsistentMapBuilder<>(primitiveCreator);
}
@Override
@@ -205,6 +202,12 @@
}
@Override
+ public <T> TopicBuilder<T> topicBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+ }
+
+ @Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
return primitiveCreator.newWorkQueue(name, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index fad3993..9058189 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -17,9 +17,11 @@
import java.util.function.Supplier;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,8 +40,24 @@
@Override
public AsyncAtomicValue<V> build() {
- return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
- checkNotNull(serializer()),
- mapBuilder.buildAsyncMap());
+ if (compatibilityFunction != null) {
+ Serializer serializer = Serializer.using(KryoNamespaces.API, CompatibleValue.class);
+
+ AsyncAtomicValue<CompatibleValue<byte[]>> rawValue = new DefaultAsyncAtomicValue<>(
+ checkNotNull(name()), serializer, mapBuilder.buildAsyncMap());
+
+ AsyncAtomicValue<CompatibleValue<V>> compatibleValue =
+ DistributedPrimitives.newTranscodingAtomicValue(
+ rawValue,
+ value -> value == null ? null :
+ new CompatibleValue<byte[]>(serializer().encode(value.value()), value.version()),
+ value -> value == null ? null :
+ new CompatibleValue<V>(serializer().decode(value.value()), value.version()));
+ return DistributedPrimitives.newCompatibleAtomicValue(compatibleValue, compatibilityFunction, version());
+ }
+ return new DefaultAsyncAtomicValue<>(
+ checkNotNull(name()),
+ checkNotNull(serializer()),
+ mapBuilder.buildAsyncMap());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 6d3d4dc..d6b9441 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import org.onosproject.core.Version;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
@@ -32,11 +31,9 @@
public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
private final DistributedPrimitiveCreator primitiveCreator;
- private final Version version;
- public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) {
+ public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
this.primitiveCreator = primitiveCreator;
- this.version = version;
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java
new file mode 100644
index 0000000..21ca7a1
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.function.BiFunction;
+
+import org.onosproject.core.Version;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.RevisionType;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
+
+/**
+ * Default topic builder.
+ */
+public class DefaultDistributedTopicBuilder<T> extends TopicBuilder<T> {
+ private final AtomicValueBuilder<T> valueBuilder;
+
+ public DefaultDistributedTopicBuilder(AtomicValueBuilder<T> valueBuilder) {
+ this.valueBuilder = valueBuilder;
+ }
+
+ @Override
+ public TopicBuilder<T> withName(String name) {
+ valueBuilder.withName(name);
+ return this;
+ }
+
+ @Override
+ public TopicBuilder<T> withSerializer(Serializer serializer) {
+ valueBuilder.withSerializer(serializer);
+ return this;
+ }
+
+ @Override
+ public TopicBuilder<T> withVersion(Version version) {
+ valueBuilder.withVersion(version);
+ return this;
+ }
+
+ @Override
+ public TopicBuilder<T> withRevisionType(RevisionType revisionType) {
+ valueBuilder.withRevisionType(revisionType);
+ return this;
+ }
+
+ @Override
+ public TopicBuilder<T> withCompatibilityFunction(BiFunction<T, Version, T> compatibilityFunction) {
+ valueBuilder.withCompatibilityFunction(compatibilityFunction);
+ return this;
+ }
+
+ @Override
+ public Topic<T> build() {
+ return new DefaultDistributedTopic<>(valueBuilder.build());
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
index 78c41e8..9b209ba 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
@@ -15,17 +15,18 @@
*/
package org.onosproject.store.primitives.impl;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
import org.onosproject.core.Version;
import org.onosproject.store.service.AsyncAtomicCounterMap;
+import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
/**
* Misc utilities for working with {@code DistributedPrimitive}s.
*/
@@ -117,6 +118,45 @@
}
/**
+ * Creates an instance of {@code AsyncAtomicValue} that transforms value types.
+ *
+ * @param value backing value
+ * @param valueEncoder transformer for value type of returned value to value type of input value
+ * @param valueDecoder transformer for value type of input value to value type of returned value
+ * @param <V1> returned value type
+ * @param <V2> input value type
+ * @return new counter map
+ */
+ public static <V1, V2> AsyncAtomicValue<V1> newTranscodingAtomicValue(AsyncAtomicValue<V2> value,
+ Function<V1, V2> valueEncoder,
+ Function<V2, V1> valueDecoder) {
+ return new TranscodingAsyncAtomicValue<>(value, valueEncoder, valueDecoder);
+ }
+
+ /**
+ * Creates an instance of {@code AsyncAtomicValue} that converts values from other versions.
+ *
+ * @param atomicValue backing value
+ * @param compatibilityFunction the compatibility function
+ * @param version local node version
+ * @param <V> value type
+ * @return compatible map
+ */
+ public static <V> AsyncAtomicValue<V> newCompatibleAtomicValue(
+ AsyncAtomicValue<CompatibleValue<V>> atomicValue,
+ BiFunction<V, Version, V> compatibilityFunction,
+ Version version) {
+ Function<V, CompatibleValue<V>> encoder = value -> new CompatibleValue<>(value, version);
+ Function<CompatibleValue<V>, V> decoder = value -> {
+ if (!value.version().equals(version)) {
+ return compatibilityFunction.apply(value.value(), value.version());
+ }
+ return value.value();
+ };
+ return new TranscodingAsyncAtomicValue<>(atomicValue, encoder, decoder);
+ }
+
+ /**
* Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
*
* @param map backing map
@@ -127,8 +167,8 @@
* @return new counter map
*/
public static <K1, K2> AsyncAtomicCounterMap<K1> newTranscodingAtomicCounterMap(AsyncAtomicCounterMap<K2> map,
- Function<K1, K2> keyEncoder,
- Function<K2, K1> keyDecoder) {
+ Function<K1, K2> keyEncoder,
+ Function<K2, K1> keyDecoder) {
return new TranscodingAsyncAtomicCounterMap<>(map, keyEncoder, keyDecoder);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index a649ecf..719a96e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -35,7 +35,6 @@
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.VersionService;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -66,6 +65,7 @@
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
@@ -104,9 +104,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MembershipService membershipService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VersionService versionService;
-
private final Supplier<TransactionId> transactionIdGenerator =
() -> TransactionId.from(UUID.randomUUID().toString());
private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -177,7 +174,7 @@
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator, versionService.version());
+ return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
}
@Override
@@ -252,6 +249,12 @@
}
@Override
+ public <T> TopicBuilder<T> topicBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+ }
+
+ @Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
return federatedPrimitiveCreator.newWorkQueue(name, serializer);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java
new file mode 100644
index 0000000..24dd35d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+
+/**
+ * An {@code AsyncAtomicValue} that transcodes values.
+ */
+public class TranscodingAsyncAtomicValue<V1, V2> implements AsyncAtomicValue<V1> {
+ private final AsyncAtomicValue<V2> backingValue;
+ private final Function<V1, V2> valueEncoder;
+ private final Function<V2, V1> valueDecoder;
+ private final Map<AtomicValueEventListener<V1>, InternalValueEventListener> listeners = Maps.newIdentityHashMap();
+
+ public TranscodingAsyncAtomicValue(
+ AsyncAtomicValue<V2> backingValue, Function<V1, V2> valueEncoder, Function<V2, V1> valueDecoder) {
+ this.backingValue = backingValue;
+ this.valueEncoder = k -> k == null ? null : valueEncoder.apply(k);
+ this.valueDecoder = k -> k == null ? null : valueDecoder.apply(k);
+ }
+
+ @Override
+ public String name() {
+ return backingValue.name();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(V1 expect, V1 update) {
+ return backingValue.compareAndSet(valueEncoder.apply(expect), valueEncoder.apply(update));
+ }
+
+ @Override
+ public CompletableFuture<V1> get() {
+ return backingValue.get().thenApply(valueDecoder);
+ }
+
+ @Override
+ public CompletableFuture<V1> getAndSet(V1 value) {
+ return backingValue.getAndSet(valueEncoder.apply(value)).thenApply(valueDecoder);
+ }
+
+ @Override
+ public CompletableFuture<Void> set(V1 value) {
+ return backingValue.set(valueEncoder.apply(value));
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(AtomicValueEventListener<V1> listener) {
+ synchronized (listeners) {
+ InternalValueEventListener backingMapListener =
+ listeners.computeIfAbsent(listener, k -> new InternalValueEventListener(listener));
+ return backingValue.addListener(backingMapListener);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(AtomicValueEventListener<V1> listener) {
+ synchronized (listeners) {
+ InternalValueEventListener backingMapListener = listeners.remove(listener);
+ if (backingMapListener != null) {
+ return backingValue.removeListener(backingMapListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ }
+
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ backingValue.addStatusChangeListener(listener);
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ backingValue.removeStatusChangeListener(listener);
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return backingValue.statusChangeListeners();
+ }
+
+ private class InternalValueEventListener implements AtomicValueEventListener<V2> {
+ private final AtomicValueEventListener<V1> listener;
+
+ InternalValueEventListener(AtomicValueEventListener<V1> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(AtomicValueEvent<V2> event) {
+ listener.event(new AtomicValueEvent<>(
+ event.name(),
+ event.newValue() != null ? valueDecoder.apply(event.newValue()) : null,
+ event.oldValue() != null ? valueDecoder.apply(event.oldValue()) : null));
+ }
+ }
+
+}
diff --git a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
index 4ba8a2c..fed4cab 100644
--- a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
+++ b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
@@ -33,6 +33,7 @@
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TopicBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
@@ -91,6 +92,11 @@
}
@Override
+ public <T> TopicBuilder<T> topicBuilder() {
+ return null;
+ }
+
+ @Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
return null;
}