ONOS-4396: Fix for EC Map synchronization failing silently due to serialization failures.
With this change we proactively fail map updates when serialization failures can occur and immediately notify the caller

Change-Id: I62a8a84731b9c2a6eeff7fa6f8336dc74234bf30
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 2f5bc0a..b45e1b2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -339,8 +339,11 @@
         checkNotNull(value, ERROR_NULL_VALUE);
 
         MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
+        // Before mutating local map, ensure the update can be serialized without errors.
+        // This prevents replica divergence due to serialization failures.
+        UpdateEntry<K, V> update = serializer.copy(new UpdateEntry<K, V>(key, newValue));
         if (putInternal(key, newValue)) {
-            notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
+            notifyPeers(update, peerUpdateFunction.apply(key, value));
             notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
         }
     }
@@ -417,13 +420,15 @@
 
         AtomicBoolean updated = new AtomicBoolean(false);
         AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
-        MapValue<V> computedValue = items.compute(key, (k, mv) -> {
+        MapValue<V> computedValue = items.compute(serializer.copy(key), (k, mv) -> {
             previousValue.set(mv);
             V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
             MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
             if (mv == null || newValue.isNewerThan(mv)) {
                 updated.set(true);
-                return newValue;
+                // We return a copy to ensure updates to peers can be serialized.
+                // This prevents replica divergence due to serialization failures.
+                return serializer.copy(newValue);
             } else {
                 return mv;
             }
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java
index cdc7058..e621f97 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java
@@ -78,6 +78,11 @@
     }
 
     @Override
+    public <T> T copy(T object) {
+        return decode(encode(object));
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                 .add("serializerPool", serializerPool)
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java
index 128a2ea..e605791 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java
@@ -75,4 +75,13 @@
      * @param <T> decoded type
      */
     <T> T decode(final InputStream stream);
+
+    /**
+     * Returns a copy of the specfied object.
+     *
+     * @param object object to copy
+     * @return a copy of the object
+     * @param <T> object type
+     */
+    <T> T copy(final T object);
 }