Add HashMultiset Catalyst serializer - used in ConsistentMultimap.

Plus catch serialization exceptions during message receive, simplify
implementation of HashMultisetValueCollector, other minor cleanups.

Change-Id: Ia50b39205a8241f456ee90ef765a8e478da868ab
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index aea7aa7..645215c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -15,14 +15,13 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.serializer.TypeSerializerFactory;
 import io.atomix.manager.util.ResourceManagerTypeResolver;
 import io.atomix.variables.internal.LongCommands;
-
-import java.util.Arrays;
-import java.util.Optional;
-
 import org.onlab.util.Match;
 import org.onosproject.cluster.Leader;
 import org.onosproject.cluster.Leadership;
@@ -56,8 +55,8 @@
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WorkQueueStats;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Optional;
 
 /**
  * Serializer utility for Atomix Catalyst.
@@ -111,6 +110,7 @@
         serializer.register(ImmutableList.of().getClass(), factory);
         serializer.register(ImmutableList.of("a").getClass(), factory);
         serializer.register(Arrays.asList().getClass(), factory);
+        serializer.register(HashMultiset.class, factory);
         serializer.register(Optional.class, factory);
 
         serializer.resolve(new LongCommands.TypeResolver());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 8d4b577..f2752cd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -15,7 +15,9 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 import io.atomix.catalyst.concurrent.Listener;
 import io.atomix.catalyst.concurrent.Listeners;
 import io.atomix.catalyst.concurrent.ThreadContext;
@@ -26,6 +28,11 @@
 import io.atomix.catalyst.transport.TransportException;
 import io.atomix.catalyst.util.Assert;
 import io.atomix.catalyst.util.reference.ReferenceCounted;
+import org.apache.commons.io.IOUtils;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.cluster.messaging.MessagingService;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -38,15 +45,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import org.apache.commons.io.IOUtils;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * {@link Connection} implementation for CopycatTransport.
@@ -148,7 +147,13 @@
                 Throwable t = context.serializer().readObject(input);
                 context.execute(() -> future.completeExceptionally(t));
             } else {
-                context.execute(() -> future.complete(context.serializer().readObject(input)));
+                context.execute(() -> {
+                    try {
+                        future.complete(context.serializer().readObject(input));
+                    } catch (SerializationException e) {
+                        future.completeExceptionally(e);
+                    }
+                });
             }
         } catch (IOException e) {
             context.execute(() -> future.completeExceptionally(e));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 4218b6e..3e80753 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -192,16 +192,14 @@
                         return super.name();
                     }
                 };
-        AsyncConsistentMultimap<K, V> trancodedMap =
-                DistributedPrimitives.<K, V, String, byte[]>newTranscodingMultimap(
+        AsyncConsistentMultimap<K, V> transcodedMap =
+                DistributedPrimitives.newTranscodingMultimap(
                         rawMap,
                         key -> HexString.toHexString(serializer.encode(key)),
-                        string -> serializer.decode(
-                                HexString.fromHexString(string)),
-                        value -> value == null ? null :
-                                serializer.encode(value),
+                        string -> serializer.decode(HexString.fromHexString(string)),
+                        value -> serializer.encode(value),
                         bytes -> serializer.decode(bytes));
-        return trancodedMap;
+        return transcodedMap;
 
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index e31653e..666ac1f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -221,8 +221,8 @@
 
     @Override
     public CompletableFuture<Multiset<V1>> values() {
-        return backingMap.values().thenApply(s -> s.stream().map(valueDecoder)
-                .collect(new MultisetCollector<>()));
+        return backingMap.values().thenApply(s ->
+            s.stream().map(valueDecoder).collect(new MultisetCollector<>()));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index ec8ebb5..e446a67 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -147,6 +147,7 @@
         return client.submit(new Keys());
     }
 
+    @Override
     public CompletableFuture<Multiset<byte[]>> values() {
         return client.submit(new Values());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 746ee00..5110187 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -598,16 +598,10 @@
             Collector<MapEntryValue,
                     HashMultiset<byte[]>,
                     HashMultiset<byte[]>> {
-        private HashMultiset<byte[]> multiset = null;
 
         @Override
         public Supplier<HashMultiset<byte[]>> supplier() {
-            return () -> {
-                if (multiset == null) {
-                    multiset = HashMultiset.create();
-                }
-                return multiset;
-            };
+            return HashMultiset::create;
         }
 
         @Override
@@ -627,7 +621,7 @@
         @Override
         public Function<HashMultiset<byte[]>,
                 HashMultiset<byte[]>> finisher() {
-            return (unused) -> multiset;
+            return Function.identity();
         }
 
         @Override
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 4db7a11..c086cb8 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.serializers;
 
+import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -274,6 +275,7 @@
                       HashSet.class,
                       LinkedHashSet.class
             )
+            .register(HashMultiset.class)
             .register(Maps.immutableEntry("a", "b").getClass())
             .register(new ArraysAsListSerializer(), Arrays.asList().getClass())
             .register(Collections.singletonList(1).getClass())