ONOS-3472 Fixing ConsistentMap key equality

- ConsistentMap's key equality is based on serialized byte[].
  2 Problems fixed by this patch:

 (1) By caching Key -> String representation,
     Cache will use Key's Object#equals for look up,
     which can possibly have different equality than byte[] equality,
     leading to wrong String to be used as a key in backend Database.

     Fixed by reversing the mapping.

 (2) Similar issues with keySet(), entrySet()
     Set based on reference equality needs to be used to avoid
     deduplication based on Object#equals

     Fixed by replacing Set implementation with MappingSet.

Change-Id: I1b727abd2614a9b72b5b1d02ecca2de26493adcc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c6d300c..af2bb74 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -20,6 +20,7 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
+
 import org.onlab.util.HexString;
 import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
@@ -33,6 +34,7 @@
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -92,18 +94,25 @@
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
 
-    private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+    // String representation of serialized byte[] -> original key Object
+    private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder()
             .softValues()
-            .build(new CacheLoader<K, String>() {
+            .build(new CacheLoader<String, K>() {
 
                 @Override
-                public String load(K key) {
-                    return HexString.toHexString(serializer.encode(key));
+                public K load(String key) {
+                    return serializer.decode(HexString.fromHexString(key));
                 }
             });
 
+    protected String sK(K key) {
+        String s = HexString.toHexString(serializer.encode(key));
+        keyCache.put(s, key);
+        return s;
+    }
+
     protected K dK(String key) {
-        return serializer.decode(HexString.fromHexString(key));
+        return keyCache.getUnchecked(key);
     }
 
     public DefaultAsyncConsistentMap(String name,
@@ -207,7 +216,7 @@
     public CompletableFuture<Boolean> containsKey(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
-        return database.mapContainsKey(name, keyCache.getUnchecked(key))
+        return database.mapContainsKey(name, sK(key))
                 .whenComplete((r, e) -> timer.stop(e));
     }
 
@@ -223,7 +232,7 @@
     public CompletableFuture<Versioned<V>> get(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         final MeteringAgent.Context timer = monitor.startTimer(GET);
-        return database.mapGet(name, keyCache.getUnchecked(key))
+        return database.mapGet(name, sK(key))
                 .whenComplete((r, e) -> timer.stop(e))
                 .thenApply(v -> v != null ? v.map(serializer::decode) : null);
     }
@@ -328,10 +337,7 @@
     public CompletableFuture<Set<K>> keySet() {
         final MeteringAgent.Context timer = monitor.startTimer(KEY_SET);
         return database.mapKeySet(name)
-                .thenApply(s -> s
-                        .stream()
-                        .map(this::dK)
-                        .collect(Collectors.toSet()))
+                .thenApply(s -> newMappingKeySet(s))
                 .whenComplete((r, e) -> timer.stop(e));
     }
 
@@ -351,10 +357,7 @@
         final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
         return database.mapEntrySet(name)
                 .whenComplete((r, e) -> timer.stop(e))
-                .thenApply(s -> s
-                        .stream()
-                        .map(this::mapRawEntry)
-                        .collect(Collectors.toSet()));
+                .thenApply(s -> newMappingEntrySet(s));
     }
 
     @Override
@@ -413,17 +416,31 @@
         checkIfUnmodifiable();
     }
 
+    private Set<K> newMappingKeySet(Set<String> s) {
+        return new MappingSet<>(s, Collections::unmodifiableSet,
+                                this::sK, this::dK);
+    }
+
+    private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) {
+        return new MappingSet<>(s, Collections::unmodifiableSet,
+                                this::reverseMapRawEntry, this::mapRawEntry);
+    }
+
     private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
         return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
     }
 
+    private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) {
+        return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode));
+    }
+
     private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
                                                                Match<V> oldValueMatch,
                                                                Match<Long> oldVersionMatch,
                                                                V value) {
         beforeUpdate(key);
         return database.mapUpdate(name,
-                keyCache.getUnchecked(key),
+                sK(key),
                 oldValueMatch.map(serializer::encode),
                 oldVersionMatch,
                 value == null ? null : serializer.encode(value))