Fix for ONOS-4315

- Additional log on error
- Allow count=0 using CountDownCompleter
- test case to detect the issue (@Ignored by default right now)
- other bug fixes found along the way

Based on patch by Madan@China

Change-Id: I7d6cb8c214052859900ef7ee0337a7e1c8a9d295
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 4823902..48fd4a2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -18,6 +18,7 @@
 import static org.onosproject.store.service.MapEvent.Type.INSERT;
 import static org.onosproject.store.service.MapEvent.Type.REMOVE;
 import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.copycat.server.session.ServerSession;
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
@@ -60,7 +61,9 @@
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -72,6 +75,7 @@
  */
 public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
 
+    private final Logger log = getLogger(getClass());
     private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
     private final Set<String> preparedKeys = Sets.newHashSet();
@@ -384,7 +388,7 @@
                 }
                 MapEntryValue existingValue = mapEntries.get(key);
                 if (existingValue == null) {
-                    if (update.currentValue() != null) {
+                    if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) {
                         return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
                     }
                 } else {
@@ -399,6 +403,9 @@
             transaction.updates().forEach(u -> preparedKeys.add(u.key()));
             ok = true;
             return PrepareResult.OK;
+        } catch (Exception e) {
+            log.warn("Failure applying {}", commit, e);
+            throw Throwables.propagate(e);
         } finally {
             if (!ok) {
                 commit.close();
@@ -416,6 +423,9 @@
         TransactionId transactionId = commit.operation().transactionId();
         try {
             return commitInternal(transactionId);
+        } catch (Exception e) {
+            log.warn("Failure applying {}", commit, e);
+            throw Throwables.propagate(e);
         } finally {
             commit.close();
         }
@@ -438,12 +448,11 @@
         List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
         for (MapUpdate<String, byte[]> update : transaction.updates()) {
             String key = update.key();
+            checkState(preparedKeys.remove(key), "key is not prepared");
             MapEntryValue previousValue = mapEntries.remove(key);
             MapEntryValue newValue = null;
-            checkState(preparedKeys.remove(key), "key is not prepared");
             if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
-                newValue = new TransactionalCommit(key,
-                        versionCounter.incrementAndGet(), completer);
+                newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
             }
             eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
             if (newValue != null) {
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index f7c44c2..a5fdb49 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -16,6 +16,8 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import io.atomix.resource.ResourceType;
+
+import static org.hamcrest.Matchers.*;
 import static org.junit.Assert.*;
 
 import java.util.Arrays;
@@ -348,6 +350,7 @@
 
         map.addListener(listener).join();
 
+        // PUT_IF_ABSENT
         MapUpdate<String, byte[]> update1 =
                 MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
                 .withKey("foo")
@@ -359,6 +362,7 @@
         map.prepare(tx).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
+        // verify changes in Tx is not visible yet until commit
         assertFalse(listener.eventReceived());
 
         map.size().thenAccept(result -> {
@@ -371,7 +375,7 @@
 
         try {
             map.put("foo", value2).join();
-            assertTrue(false);
+            fail("update to map entry in open tx should fail with Exception");
         } catch (CompletionException e) {
             assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
         }
@@ -384,6 +388,7 @@
         assertEquals(MapEvent.Type.INSERT, event.type());
         assertTrue(Arrays.equals(value1, event.newValue().value()));
 
+        // map should be update-able after commit
         map.put("foo", value2).thenAccept(result -> {
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
         }).join();
@@ -391,6 +396,43 @@
         assertNotNull(event);
         assertEquals(MapEvent.Type.UPDATE, event.type());
         assertTrue(Arrays.equals(value2, event.newValue().value()));
+
+
+        // REMOVE_IF_VERSION_MATCH
+        byte[] currFoo = map.get("foo").get().value();
+        long currFooVersion = map.get("foo").get().version();
+        MapUpdate<String, byte[]> remove1 =
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                .withKey("foo")
+                .withCurrentVersion(currFooVersion)
+                .build();
+
+        tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+
+        map.prepare(tx).thenAccept(result -> {
+            assertTrue("prepare should succeed", result);
+        }).join();
+        // verify changes in Tx is not visible yet until commit
+        assertFalse(listener.eventReceived());
+
+        map.size().thenAccept(size -> {
+            assertThat(size, is(1));
+        }).join();
+
+        map.get("foo").thenAccept(result -> {
+            assertThat(result.value(), is(currFoo));
+        }).join();
+
+        map.commit(tx.transactionId()).join();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.REMOVE, event.type());
+        assertArrayEquals(currFoo, event.oldValue().value());
+
+        map.size().thenAccept(size -> {
+            assertThat(size, is(0));
+        }).join();
+
     }
 
     protected void transactionRollbackTests(int clusterSize) throws Throwable {
diff --git a/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java b/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java
index e3ef9be..ef17bc4 100644
--- a/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java
+++ b/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java
@@ -45,10 +45,13 @@
      * @param onCompleteCallback callback to invoke when completer is completed
      */
     public CountDownCompleter(T object, long count, Consumer<T> onCompleteCallback) {
-        checkState(count > 0, "count must be positive");
+        checkState(count >= 0, "count must be non-negative");
         this.counter = new AtomicLong(count);
         this.object = checkNotNull(object);
         this.onCompleteCallback = checkNotNull(onCompleteCallback);
+        if (count == 0) {
+            onCompleteCallback.accept(object);
+        }
     }
 
     /**