Some improvements around EventuallyConsistentMapBuilder serializer handling.
- Allow directly passing KryoNamespace
- Add some registration id gap before ECMap's internal registration
- Some improvements for ease of registration issue investigation
-- Add friendly name to ECMap's internal KryoNamespace
-- Add backtrace information
Change-Id: I7c87b3aefbaea4b2ed12b38c3e0813e9d195c7a9
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index 51b8f6d..ea7b5d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -42,6 +42,7 @@
private final ClusterCommunicationService clusterCommunicator;
private String name;
+ private KryoNamespace serializer;
private KryoNamespace.Builder serializerBuilder;
private ExecutorService eventExecutor;
private ExecutorService communicationExecutor;
@@ -85,6 +86,12 @@
}
@Override
+ public EventuallyConsistentMapBuilder<K, V> withSerializer(KryoNamespace serializer) {
+ this.serializer = checkNotNull(serializer);
+ return this;
+ }
+
+ @Override
public EventuallyConsistentMapBuilder<K, V> withTimestampProvider(
BiFunction<K, V, Timestamp> timestampProvider) {
this.timestampProvider = checkNotNull(timestampProvider);
@@ -147,13 +154,16 @@
@Override
public EventuallyConsistentMap<K, V> build() {
checkNotNull(name, "name is a mandatory parameter");
- checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
checkNotNull(timestampProvider, "timestampProvider is a mandatory parameter");
+ if (serializer == null && serializerBuilder != null) {
+ serializer = serializerBuilder.build(name);
+ }
+ checkNotNull(serializer, "serializer is a mandatory parameter");
return new EventuallyConsistentMapImpl<>(name,
clusterService,
clusterCommunicator,
- serializerBuilder,
+ serializer,
timestampProvider,
peerUpdateFunction,
eventExecutor,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 2374691..7a9e655 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -58,7 +58,6 @@
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,7 +137,7 @@
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
+ * @param serializer a Kryo namespace that can serialize
* both K and V
* @param timestampProvider provider of timestamps for K and V
* @param peerUpdateFunction function that provides a set of nodes to immediately
@@ -159,7 +158,7 @@
EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
+ KryoNamespace ns,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
ExecutorService eventExecutor,
@@ -172,25 +171,14 @@
boolean persistent,
PersistenceService persistenceService) {
this.mapName = mapName;
- this.serializer = createSerializer(serializerBuilder);
+ this.serializer = createSerializer(ns);
this.persistenceService = persistenceService;
this.persistent =
persistent;
if (persistent) {
items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
.withName(PERSISTENT_LOCAL_MAP_NAME)
- .withSerializer(new Serializer() {
-
- @Override
- public <T> byte[] encode(T object) {
- return EventuallyConsistentMapImpl.this.serializer.encode(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
- }
- })
+ .withSerializer(this.serializer)
.build();
} else {
items = Maps.newConcurrentMap();
@@ -268,18 +256,21 @@
this.lightweightAntiEntropy = !convergeFaster;
}
- private StoreSerializer createSerializer(KryoNamespace.Builder builder) {
- return StoreSerializer.using(builder
- .register(KryoNamespaces.BASIC)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(LogicalTimestamp.class)
- .register(WallClockTimestamp.class)
- .register(AntiEntropyAdvertisement.class)
- .register(AntiEntropyResponse.class)
- .register(UpdateEntry.class)
- .register(MapValue.class)
- .register(MapValue.Digest.class)
- .build(name()));
+ private StoreSerializer createSerializer(KryoNamespace ns) {
+ return StoreSerializer.using(KryoNamespace.newBuilder()
+ .register(ns)
+ // not so robust way to avoid collision with other
+ // user supplied registrations
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+ .register(KryoNamespaces.BASIC)
+ .register(LogicalTimestamp.class)
+ .register(WallClockTimestamp.class)
+ .register(AntiEntropyAdvertisement.class)
+ .register(AntiEntropyResponse.class)
+ .register(UpdateEntry.class)
+ .register(MapValue.class)
+ .register(MapValue.Digest.class)
+ .build(name() + "-ecmap"));
}
@Override