ONOS-3472 Fixing ConsistentMap key equality

- ConsistentMap's key equality is based on serialized byte[].
  2 Problems fixed by this patch:

 (1) By caching Key -> String representation,
     Cache will use Key's Object#equals for look up,
     which can possibly have different equality than byte[] equality,
     leading to wrong String to be used as a key in backend Database.

     Fixed by reversing the mapping.

 (2) Similar issues with keySet(), entrySet()
     Set based on reference equality needs to be used to avoid
     deduplication based on Object#equals

     Fixed by replacing Set implementation with MappingSet.

Change-Id: I1b727abd2614a9b72b5b1d02ecca2de26493adcc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c6d300c..af2bb74 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -20,6 +20,7 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
+
 import org.onlab.util.HexString;
 import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
@@ -33,6 +34,7 @@
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -92,18 +94,25 @@
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
 
-    private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+    // String representation of serialized byte[] -> original key Object
+    private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder()
             .softValues()
-            .build(new CacheLoader<K, String>() {
+            .build(new CacheLoader<String, K>() {
 
                 @Override
-                public String load(K key) {
-                    return HexString.toHexString(serializer.encode(key));
+                public K load(String key) {
+                    return serializer.decode(HexString.fromHexString(key));
                 }
             });
 
+    protected String sK(K key) {
+        String s = HexString.toHexString(serializer.encode(key));
+        keyCache.put(s, key);
+        return s;
+    }
+
     protected K dK(String key) {
-        return serializer.decode(HexString.fromHexString(key));
+        return keyCache.getUnchecked(key);
     }
 
     public DefaultAsyncConsistentMap(String name,
@@ -207,7 +216,7 @@
     public CompletableFuture<Boolean> containsKey(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
-        return database.mapContainsKey(name, keyCache.getUnchecked(key))
+        return database.mapContainsKey(name, sK(key))
                 .whenComplete((r, e) -> timer.stop(e));
     }
 
@@ -223,7 +232,7 @@
     public CompletableFuture<Versioned<V>> get(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         final MeteringAgent.Context timer = monitor.startTimer(GET);
-        return database.mapGet(name, keyCache.getUnchecked(key))
+        return database.mapGet(name, sK(key))
                 .whenComplete((r, e) -> timer.stop(e))
                 .thenApply(v -> v != null ? v.map(serializer::decode) : null);
     }
@@ -328,10 +337,7 @@
     public CompletableFuture<Set<K>> keySet() {
         final MeteringAgent.Context timer = monitor.startTimer(KEY_SET);
         return database.mapKeySet(name)
-                .thenApply(s -> s
-                        .stream()
-                        .map(this::dK)
-                        .collect(Collectors.toSet()))
+                .thenApply(s -> newMappingKeySet(s))
                 .whenComplete((r, e) -> timer.stop(e));
     }
 
@@ -351,10 +357,7 @@
         final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
         return database.mapEntrySet(name)
                 .whenComplete((r, e) -> timer.stop(e))
-                .thenApply(s -> s
-                        .stream()
-                        .map(this::mapRawEntry)
-                        .collect(Collectors.toSet()));
+                .thenApply(s -> newMappingEntrySet(s));
     }
 
     @Override
@@ -413,17 +416,31 @@
         checkIfUnmodifiable();
     }
 
+    private Set<K> newMappingKeySet(Set<String> s) {
+        return new MappingSet<>(s, Collections::unmodifiableSet,
+                                this::sK, this::dK);
+    }
+
+    private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) {
+        return new MappingSet<>(s, Collections::unmodifiableSet,
+                                this::reverseMapRawEntry, this::mapRawEntry);
+    }
+
     private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
         return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
     }
 
+    private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) {
+        return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode));
+    }
+
     private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
                                                                Match<V> oldValueMatch,
                                                                Match<Long> oldVersionMatch,
                                                                V value) {
         beforeUpdate(key);
         return database.mapUpdate(name,
-                keyCache.getUnchecked(key),
+                sK(key),
                 oldValueMatch.map(serializer::encode),
                 oldVersionMatch,
                 value == null ? null : serializer.encode(value))
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java
new file mode 100644
index 0000000..9bf80a7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Set view backed by Set with element type {@code <BACK>} but returns
+ * element as {@code <OUT>} for convenience.
+ *
+ * @param <BACK> Backing {@link Set} element type.
+ *        MappingSet will follow this type's equality behavior.
+ * @param <OUT> external facing element type.
+ *        MappingSet will ignores equality defined by this type.
+ */
+class MappingSet<BACK, OUT> implements Set<OUT> {
+
+    private final Set<BACK> backedSet;
+    private final Function<OUT, BACK> toBack;
+    private final Function<BACK, OUT> toOut;
+
+    public MappingSet(Set<BACK> backedSet,
+                      Function<Set<BACK>, Set<BACK>> supplier,
+                      Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) {
+        this.backedSet = supplier.apply(backedSet);
+        this.toBack = toBack;
+        this.toOut = toOut;
+    }
+
+    @Override
+    public int size() {
+        return backedSet.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return backedSet.isEmpty();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return backedSet.contains(toBack.apply((OUT) o));
+    }
+
+    @Override
+    public Iterator<OUT> iterator() {
+        return Iterators.transform(backedSet.iterator(), toOut::apply);
+    }
+
+    @Override
+    public Object[] toArray() {
+        return backedSet.stream()
+                .map(toOut)
+                .toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        return backedSet.stream()
+                .map(toOut)
+                .toArray(size -> {
+                    if (size < a.length) {
+                        return (T[]) new Object[size];
+                    } else {
+                        Arrays.fill(a, null);
+                        return a;
+                    }
+                });
+    }
+
+    @Override
+    public boolean add(OUT e) {
+        return backedSet.add(toBack.apply(e));
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        return backedSet.remove(toBack.apply((OUT) o));
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        return c.stream()
+            .map(e -> toBack.apply((OUT) e))
+            .allMatch(backedSet::contains);
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends OUT> c) {
+        return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList()));
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        return backedSet.retainAll(c.stream()
+                .map(x -> toBack.apply((OUT) x))
+                .collect(Collectors.toList()));
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        return backedSet.removeAll(c.stream()
+                .map(x -> toBack.apply((OUT) x))
+                .collect(Collectors.toList()));
+    }
+
+    @Override
+    public void clear() {
+        backedSet.clear();
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java b/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java
new file mode 100644
index 0000000..3f6402c
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java
@@ -0,0 +1,369 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static java.util.Collections.unmodifiableCollection;
+import static java.util.Collections.unmodifiableSet;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
+import net.kuujo.copycat.resource.ResourceState;
+
+/**
+ *
+ */
+public class DefaultAsyncConsistentMapTest {
+
+    private static final ApplicationId APP_ID = new DefaultApplicationId(42, "what");
+
+    private static final TestData KEY1A = new TestData("One", "a");
+    private static final TestData KEY1B = new TestData("One", "b");
+
+    private static final TestData VALUE2A = new TestData("Two", "a");
+    private static final TestData VALUE2B = new TestData("Two", "b");
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testKeySet() throws Exception {
+        DefaultAsyncConsistentMap<TestData, TestData> map;
+        String name = "map_name";
+        Database database = new TestDatabase();
+        Serializer serializer = Serializer.forTypes(TestData.class);
+
+        map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer,
+                                            false, false, false);
+        map.put(KEY1A, VALUE2A);
+        map.put(KEY1B, VALUE2A);
+
+        Set<TestData> set = map.keySet().get();
+        assertEquals("Should contain 2 keys",
+                2, set.size());
+        assertThat(set.contains(KEY1A), is(true));
+        assertThat(set.contains(KEY1B), is(true));
+        assertThat(set.contains(new TestData("One", "a")), is(true));
+    }
+
+    @Test
+    public void testEntrySet() throws Exception {
+        DefaultAsyncConsistentMap<TestData, TestData> map;
+        String name = "map_name";
+        Database database = new TestDatabase();
+        Serializer serializer = Serializer.forTypes(TestData.class);
+
+        map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer,
+                                            false, false, false);
+        map.put(KEY1A, VALUE2A);
+        map.put(KEY1B, VALUE2A);
+
+        assertEquals("Should contain 2 entry",
+                     2,
+                     map.entrySet().get().size());
+    }
+
+    /**
+     * Object to be used as a test data.
+     *
+     * {@link Object#equals(Object)} use only part of it's fields.
+     *
+     * As a result there can be 2 instances which the
+     * serialized bytes are not-equal but
+     * {@link Object#equals(Object)}-wise they are equal.
+     */
+    public static class TestData {
+
+        private final String theKey;
+
+        @SuppressWarnings("unused")
+        private final String notUsedForEquals;
+
+        public TestData(String theKey, String notUsedForEquals) {
+            this.theKey = theKey;
+            this.notUsedForEquals = notUsedForEquals;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(theKey);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof TestData) {
+                TestData that = (TestData) obj;
+                return Objects.equals(this.theKey, that.theKey);
+            }
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("theKey", theKey)
+                    .add("notUsedForEquals", notUsedForEquals)
+                    .toString();
+        }
+    }
+
+    /**
+     * {@link Database} implementation for testing.
+     *
+     * There is only 1 backing Map, {@code mapName} will be ignored.
+     */
+    public class TestDatabase implements Database {
+
+        Map<String, Versioned<byte[]>> map = new ConcurrentHashMap<>();
+
+        @Override
+        public CompletableFuture<Set<String>> maps() {
+            return CompletableFuture.completedFuture(ImmutableSet.of());
+        }
+
+        @Override
+        public CompletableFuture<Map<String, Long>> counters() {
+            return CompletableFuture.completedFuture(ImmutableMap.of());
+        }
+
+        @Override
+        public CompletableFuture<Integer> mapSize(String mapName) {
+            return CompletableFuture.completedFuture(map.size());
+        }
+
+        @Override
+        public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
+            return CompletableFuture.completedFuture(map.isEmpty());
+        }
+
+        @Override
+        public CompletableFuture<Boolean> mapContainsKey(String mapName,
+                                                         String key) {
+            return CompletableFuture.completedFuture(map.containsKey(key));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> mapContainsValue(String mapName,
+                                                           byte[] value) {
+            return CompletableFuture.completedFuture(map.containsValue(value));
+        }
+
+        @Override
+        public CompletableFuture<Versioned<byte[]>> mapGet(String mapName,
+                                                           String key) {
+            return CompletableFuture.completedFuture(map.get(key));
+        }
+
+        @Override
+        public synchronized CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(String mapName,
+                                                                                 String key,
+                                                                                 Match<byte[]> valueMatch,
+                                                                                 Match<Long> versionMatch,
+                                                                                 byte[] value) {
+
+            boolean updated = false;
+            final Versioned<byte[]> oldValue;
+            final Versioned<byte[]> newValue;
+
+            Versioned<byte[]> old = map.getOrDefault(key, new Versioned<byte[]>(null, 0));
+            if (valueMatch.matches(old.value()) && versionMatch.matches(old.version())) {
+                updated = true;
+                oldValue = old;
+                newValue = new Versioned<>(value, old.version() + 1);
+                map.put(key, newValue);
+            } else {
+                updated = false;
+                oldValue = old;
+                newValue = old;
+            }
+            return CompletableFuture.completedFuture(
+                             Result.ok(new UpdateResult<String, byte[]>(updated,
+                                            mapName, key, oldValue, newValue)));
+        }
+
+        @Override
+        public CompletableFuture<Result<Void>> mapClear(String mapName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Set<String>> mapKeySet(String mapName) {
+            return CompletableFuture.completedFuture(unmodifiableSet(map.keySet()));
+        }
+
+        @Override
+        public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
+            return CompletableFuture.completedFuture(unmodifiableCollection(map.values()));
+        }
+
+        @Override
+        public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
+            return CompletableFuture.completedFuture(unmodifiableSet(map.entrySet()));
+        }
+
+        @Override
+        public CompletableFuture<Long> counterAddAndGet(String counterName,
+                                                        long delta) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Long> counterGetAndAdd(String counterName,
+                                                        long delta) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Void> counterSet(String counterName,
+                                                  long value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> counterCompareAndSet(String counterName,
+                                                               long expectedValue,
+                                                               long update) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Long> counterGet(String counterName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Long> queueSize(String queueName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Void> queuePush(String queueName,
+                                                 byte[] entry) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<byte[]> queuePop(String queueName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<byte[]> queuePeek(String queueName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> prepare(Transaction transaction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<CommitResponse> commit(Transaction transaction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> rollback(Transaction transaction) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String name() {
+            return "name";
+        }
+
+        @Override
+        public ResourceState state() {
+            return ResourceState.HEALTHY;
+        }
+
+        @Override
+        public Cluster cluster() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Database addStartupTask(Task<CompletableFuture<Void>> task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Database> open() {
+            return CompletableFuture.completedFuture(this);
+        }
+
+        @Override
+        public boolean isOpen() {
+            return true;
+        }
+
+        @Override
+        public CompletableFuture<Void> close() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public boolean isClosed() {
+            return false;
+        }
+
+        @Override
+        public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+        }
+
+        @Override
+        public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+        }
+    }
+
+}