ONOS-2495: Workaround for the fact that kryo deserialization is not thread safe

Change-Id: Id1bd3334f9a5c122984d08f97dbbbf622b27cf33
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 9082ba6..a661cc6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -32,7 +32,6 @@
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
-
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -122,22 +121,35 @@
         this.purgeOnUninstall = purgeOnUninstall;
         this.database.registerConsumer(update -> {
             SharedExecutors.getSingleThreadExecutor().execute(() -> {
-                if (update.target() == MAP_UPDATE) {
-                    Result<UpdateResult<String, byte[]>> result = update.output();
-                    if (result.success() && result.value().mapName().equals(name)) {
-                        MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
-                        notifyListeners(mapEvent);
+                if (listeners.isEmpty()) {
+                    return;
+                }
+                try {
+                    if (update.target() == MAP_UPDATE) {
+                        Result<UpdateResult<String, byte[]>> result = update.output();
+                        if (result.success() && result.value().mapName().equals(name)) {
+                            MapEvent<K, V> mapEvent = result.value()
+                                                            .<K, V>map(this::dK,
+                                                                       v -> serializer.decode(Tools.copyOf(v)))
+                                                            .toMapEvent();
+                            notifyListeners(mapEvent);
+                        }
+                    } else if (update.target() == TX_COMMIT) {
+                        CommitResponse response = update.output();
+                        if (response.success()) {
+                            response.updates().forEach(u -> {
+                                if (u.mapName().equals(name)) {
+                                    MapEvent<K, V> mapEvent =
+                                            u.<K, V>map(this::dK,
+                                                        v -> serializer.decode(Tools.copyOf(v)))
+                                             .toMapEvent();
+                                    notifyListeners(mapEvent);
+                                }
+                            });
+                        }
                     }
-                } else if (update.target() == TX_COMMIT) {
-                    CommitResponse response = update.output();
-                    if (response.success()) {
-                        response.updates().forEach(u -> {
-                            if (u.mapName().equals(name)) {
-                                MapEvent<K, V> mapEvent = u.<K, V>map(this::dK, serializer::decode).toMapEvent();
-                                notifyListeners(mapEvent);
-                            }
-                        });
-                    }
+                } catch (Exception e) {
+                    log.warn("Error notifying listeners", e);
                 }
             });
         });
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index ec6b6da..301f411 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -190,6 +190,16 @@
     }
 
     /**
+     * Returns a copy of the input byte array.
+     *
+     * @param original input
+     * @return copy of original
+     */
+    public static byte[] copyOf(byte[] original) {
+        return Arrays.copyOf(original, original.length);
+    }
+
+    /**
      * Get property as a string value.
      *
      * @param properties   properties to be looked up