Add HashMultiset Catalyst serializer - used in ConsistentMultimap.
Plus catch serialization exceptions during message receive, simplify
implementation of HashMultisetValueCollector, other minor cleanups.
Change-Id: Ia50b39205a8241f456ee90ef765a8e478da868ab
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index aea7aa7..645215c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -15,14 +15,13 @@
*/
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import io.atomix.variables.internal.LongCommands;
-
-import java.util.Arrays;
-import java.util.Optional;
-
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
@@ -56,8 +55,8 @@
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Optional;
/**
* Serializer utility for Atomix Catalyst.
@@ -111,6 +110,7 @@
serializer.register(ImmutableList.of().getClass(), factory);
serializer.register(ImmutableList.of("a").getClass(), factory);
serializer.register(Arrays.asList().getClass(), factory);
+ serializer.register(HashMultiset.class, factory);
serializer.register(Optional.class, factory);
serializer.resolve(new LongCommands.TypeResolver());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 8d4b577..f2752cd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -15,7 +15,9 @@
*/
package org.onosproject.store.primitives.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.Listeners;
import io.atomix.catalyst.concurrent.ThreadContext;
@@ -26,6 +28,11 @@
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.reference.ReferenceCounted;
+import org.apache.commons.io.IOUtils;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.cluster.messaging.MessagingService;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -38,15 +45,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import org.apache.commons.io.IOUtils;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* {@link Connection} implementation for CopycatTransport.
@@ -148,7 +147,13 @@
Throwable t = context.serializer().readObject(input);
context.execute(() -> future.completeExceptionally(t));
} else {
- context.execute(() -> future.complete(context.serializer().readObject(input)));
+ context.execute(() -> {
+ try {
+ future.complete(context.serializer().readObject(input));
+ } catch (SerializationException e) {
+ future.completeExceptionally(e);
+ }
+ });
}
} catch (IOException e) {
context.execute(() -> future.completeExceptionally(e));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 4218b6e..3e80753 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -192,16 +192,14 @@
return super.name();
}
};
- AsyncConsistentMultimap<K, V> trancodedMap =
- DistributedPrimitives.<K, V, String, byte[]>newTranscodingMultimap(
+ AsyncConsistentMultimap<K, V> transcodedMap =
+ DistributedPrimitives.newTranscodingMultimap(
rawMap,
key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(
- HexString.fromHexString(string)),
- value -> value == null ? null :
- serializer.encode(value),
+ string -> serializer.decode(HexString.fromHexString(string)),
+ value -> serializer.encode(value),
bytes -> serializer.decode(bytes));
- return trancodedMap;
+ return transcodedMap;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index e31653e..666ac1f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -221,8 +221,8 @@
@Override
public CompletableFuture<Multiset<V1>> values() {
- return backingMap.values().thenApply(s -> s.stream().map(valueDecoder)
- .collect(new MultisetCollector<>()));
+ return backingMap.values().thenApply(s ->
+ s.stream().map(valueDecoder).collect(new MultisetCollector<>()));
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index ec8ebb5..e446a67 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -147,6 +147,7 @@
return client.submit(new Keys());
}
+ @Override
public CompletableFuture<Multiset<byte[]>> values() {
return client.submit(new Values());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 746ee00..5110187 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -598,16 +598,10 @@
Collector<MapEntryValue,
HashMultiset<byte[]>,
HashMultiset<byte[]>> {
- private HashMultiset<byte[]> multiset = null;
@Override
public Supplier<HashMultiset<byte[]>> supplier() {
- return () -> {
- if (multiset == null) {
- multiset = HashMultiset.create();
- }
- return multiset;
- };
+ return HashMultiset::create;
}
@Override
@@ -627,7 +621,7 @@
@Override
public Function<HashMultiset<byte[]>,
HashMultiset<byte[]>> finisher() {
- return (unused) -> multiset;
+ return Function.identity();
}
@Override
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 4db7a11..c086cb8 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.serializers;
+import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -274,6 +275,7 @@
HashSet.class,
LinkedHashSet.class
)
+ .register(HashMultiset.class)
.register(Maps.immutableEntry("a", "b").getClass())
.register(new ArraysAsListSerializer(), Arrays.asList().getClass())
.register(Collections.singletonList(1).getClass())