[ONOS-6874] Implement nullable ConsistentMap

Change-Id: I82a232d376b230b06977b24f12364aa374c9c113
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
index ae97660..d991072 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
@@ -26,6 +26,7 @@
 public abstract class ConsistentMapBuilder<K, V>
     extends DistributedPrimitiveBuilder<ConsistentMapBuilder<K, V>, ConsistentMap<K, V>> {
 
+    private boolean nullValues = false;
     private boolean purgeOnUninstall = false;
 
     public ConsistentMapBuilder() {
@@ -33,6 +34,16 @@
     }
 
     /**
+     * Enables null values in the map.
+     *
+     * @return this builder
+     */
+    public ConsistentMapBuilder<K, V> withNullValues() {
+        nullValues = true;
+        return this;
+    }
+
+    /**
      * Clears map contents when the owning application is uninstalled.
      *
      * @return this builder
@@ -43,6 +54,15 @@
     }
 
     /**
+     * Returns whether null values are supported by the map.
+     *
+     * @return {@code true} if null values are supported; {@code false} otherwise
+     */
+    public boolean nullValues() {
+        return nullValues;
+    }
+
+    /**
      * Returns if map entries need to be cleared when owning application is uninstalled.
      * @return {@code true} if yes; {@code false} otherwise.
      */
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 32a7506..2bc06dd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -42,6 +42,7 @@
     @Override
     public AsyncConsistentMap<K, V> buildAsyncMap() {
         AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+        map = nullValues() ? map : DistributedPrimitives.newNotNullMap(map);
         map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
         map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
         return meteringEnabled() ? DistributedPrimitives.newMeteredMap(map) : map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
index 25b6d00..33b72a4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
@@ -78,6 +78,18 @@
     }
 
     /**
+     * Creates an instance of {@code AsyncConsistentMap} that disallows null values.
+     *
+     * @param map backing map
+     * @return not null map
+     * @param <K> map key type
+     * @param <V> map value type
+     */
+    public static <K, V> AsyncConsistentMap<K, V> newNotNullMap(AsyncConsistentMap<K, V> map) {
+        return new NotNullAsyncConsistentMap<>(map);
+    }
+
+    /**
      * Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
      *
      * @param map backing map
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NotNullAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NotNullAsyncConsistentMap.java
new file mode 100644
index 0000000..9282cbf
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NotNullAsyncConsistentMap.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * {@link org.onosproject.store.service.AsyncConsistentMap} that doesn't allow null values.
+ */
+public class NotNullAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
+    private final AsyncConsistentMap<K, V> delegateMap;
+
+    public NotNullAsyncConsistentMap(AsyncConsistentMap<K, V> delegateMap) {
+        super(delegateMap);
+        this.delegateMap = delegateMap;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(V value) {
+        if (value == null) {
+            return CompletableFuture.completedFuture(false);
+        }
+        return super.containsValue(value);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> get(K key) {
+        return super.get(key).thenApply(v -> v != null && v.value() == null ? null : v);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
+        return super.getOrDefault(key, defaultValue).thenApply(v -> v != null && v.value() == null ? null : v);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> put(K key, V value) {
+        if (value == null) {
+            return super.remove(key);
+        }
+        return super.put(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
+        if (value == null) {
+            return super.remove(key).thenApply(v -> null);
+        }
+        return super.putAndGet(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Versioned<V>>> values() {
+        return super.values().thenApply(value -> value.stream()
+            .filter(v -> v.value() != null)
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() {
+        return super.entrySet().thenApply(entries -> entries.stream()
+            .filter(e -> e.getValue().value() != null)
+            .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
+        if (value == null) {
+            return super.remove(key);
+        }
+        return super.putIfAbsent(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, V value) {
+        if (value == null) {
+            return CompletableFuture.completedFuture(false);
+        }
+        return super.remove(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, long version) {
+        return super.remove(key, version);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> replace(K key, V value) {
+        if (value == null) {
+            return super.remove(key);
+        }
+        return super.replace(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
+        if (oldValue == null) {
+            return super.putIfAbsent(key, newValue).thenApply(Objects::isNull);
+        } else if (newValue == null) {
+            return super.remove(key, oldValue);
+        }
+        return super.replace(key, oldValue, newValue);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
+        return super.replace(key, oldVersion, newValue);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
index 4804e71..31d5bf8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapOperations.java
@@ -164,7 +164,7 @@
         }
 
         public ValueOperation(byte[] value) {
-            this.value = checkNotNull(value, "value cannot be null");
+            this.value = value;
         }
 
         /**
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/NotNullConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/NotNullConsistentMapTest.java
new file mode 100644
index 0000000..9c9e55d
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/NotNullConsistentMapTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Arrays;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixTestBase;
+import org.onosproject.store.service.AsyncConsistentMap;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link AtomixConsistentMap}.
+ */
+public class NotNullConsistentMapTest extends AtomixTestBase<AtomixConsistentMap> {
+
+    @Override
+    protected RaftService createService() {
+        return new AtomixConsistentMapService();
+    }
+
+    @Override
+    protected AtomixConsistentMap createPrimitive(RaftProxy proxy) {
+        return new AtomixConsistentMap(proxy);
+    }
+
+    /**
+     * Tests not null values.
+     */
+    @Test
+    public void testNotNullValues() throws Throwable {
+        final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
+        final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
+
+        AsyncConsistentMap<String, byte[]> map =
+                DistributedPrimitives.newNotNullMap(newPrimitive("testNotNullValues"));
+
+        map.get("foo")
+                .thenAccept(v -> assertNull(v)).join();
+        map.put("foo", null)
+                .thenAccept(v -> assertNull(v)).join();
+        map.put("foo", rawFooValue).thenAccept(v -> assertNull(v)).join();
+        map.get("foo").thenAccept(v -> {
+            assertNotNull(v);
+            assertTrue(Arrays.equals(v.value(), rawFooValue));
+        }).join();
+        map.put("foo", null).thenAccept(v -> {
+            assertNotNull(v);
+            assertTrue(Arrays.equals(v.value(), rawFooValue));
+        }).join();
+        map.get("foo").thenAccept(v -> assertNull(v)).join();
+        map.replace("foo", rawFooValue, null)
+                .thenAccept(replaced -> assertFalse(replaced)).join();
+        map.replace("foo", null, rawBarValue)
+                .thenAccept(replaced -> assertTrue(replaced)).join();
+        map.get("foo").thenAccept(v -> {
+            assertNotNull(v);
+            assertTrue(Arrays.equals(v.value(), rawBarValue));
+        }).join();
+        map.replace("foo", rawBarValue, null)
+                .thenAccept(replaced -> assertTrue(replaced)).join();
+        map.get("foo").thenAccept(v -> assertNull(v)).join();
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 42c6dbc..d08495a 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -79,6 +79,44 @@
     }
 
     /**
+     * Tests null values.
+     */
+    @Test
+    public void testNullValues() throws Throwable {
+        final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
+        final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
+
+        AtomixConsistentMap map = newPrimitive("testNullValues");
+
+        map.get("foo")
+                .thenAccept(v -> assertNull(v)).join();
+        map.put("foo", null)
+                .thenAccept(v -> assertNull(v)).join();
+        map.put("foo", rawFooValue).thenAccept(v -> {
+            assertNotNull(v);
+            assertNull(v.value());
+        }).join();
+        map.get("foo").thenAccept(v -> {
+            assertNotNull(v);
+            assertTrue(Arrays.equals(v.value(), rawFooValue));
+        }).join();
+        map.replace("foo", rawFooValue, null)
+                .thenAccept(replaced -> assertTrue(replaced)).join();
+        map.get("foo").thenAccept(v -> {
+            assertNotNull(v);
+            assertNull(v.value());
+        }).join();
+        map.replace("foo", rawFooValue, rawBarValue)
+                .thenAccept(replaced -> assertFalse(replaced)).join();
+        map.replace("foo", null, rawBarValue)
+                .thenAccept(replaced -> assertTrue(replaced)).join();
+        map.get("foo").thenAccept(v -> {
+            assertNotNull(v);
+            assertTrue(Arrays.equals(v.value(), rawBarValue));
+        }).join();
+    }
+
+    /**
      * Tests map event notifications.
      */
     @Test
@@ -274,7 +312,6 @@
         }).join();
     }
 
-
     protected void mapListenerTests() throws Throwable {
         final byte[] value1 = Tools.getBytesUtf8("value1");
         final byte[] value2 = Tools.getBytesUtf8("value2");