Default ConsistentMap consistency level to SEQUENTIAL and reenable all Atomix unit tests

Change-Id: Ic04ff81fbaaa7c007f20077391a72fdfa9fd382a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 4c065a5..14c24a6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -236,11 +236,8 @@
     @Override
     public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
         if (!mapEventListeners.isEmpty()) {
-            if (mapEventListeners.add(listener)) {
-                return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
+            mapEventListeners.add(listener);
+            return CompletableFuture.completedFuture(null);
         }
         mapEventListeners.add(listener);
         return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 4f912da..dbc3157 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -52,7 +52,7 @@
 
         @Override
         public ConsistencyLevel consistency() {
-          return ConsistencyLevel.LINEARIZABLE;
+          return ConsistencyLevel.SEQUENTIAL;
         }
 
         @Override
@@ -78,7 +78,7 @@
 
         @Override
         public ConsistencyLevel consistency() {
-          return ConsistencyLevel.BOUNDED_LINEARIZABLE;
+          return ConsistencyLevel.SEQUENTIAL;
         }
 
         @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index a6e6ca0..6395f69 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -337,10 +337,9 @@
      *
      * @param commit unlisten commit
      */
-    protected void unlisten(
-            Commit<? extends Unlisten> commit) {
+    protected void unlisten(Commit<? extends Unlisten> commit) {
         try {
-            Commit<? extends Listen> listener = listeners.remove(commit.session());
+            Commit<? extends Listen> listener = listeners.remove(commit.session().id());
             if (listener != null) {
                 listener.close();
             }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 26aa9a8..150b99d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -21,10 +21,11 @@
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionException;
 import java.util.stream.Collectors;
 
-import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.util.Tools;
 import org.onosproject.store.primitives.MapUpdate;
@@ -34,12 +35,12 @@
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Sets;
 
 /**
  * Unit tests for {@link AtomixConsistentMap}.
  */
-@Ignore
 public class AtomixConsistentMapTest extends AtomixTestBase {
 
     @Override
@@ -285,55 +286,52 @@
         TestMapEventListener listener = new TestMapEventListener();
 
         // add listener; insert new value into map and verify an INSERT event is received.
-        map.addListener(listener).join();
-        map.put("foo", value1).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.INSERT, listener.event().type());
-        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
-        listener.clearEvent();
+        map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
+        MapEvent<String, byte[]> event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.INSERT, event.type());
+        assertTrue(Arrays.equals(value1, event.newValue().value()));
 
         // remove listener and verify listener is not notified.
-        map.removeListener(listener).join();
-        map.put("foo", value2).join();
-        assertNull(listener.event());
+        map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
+        assertFalse(listener.eventReceived());
 
         // add the listener back and verify UPDATE events are received correctly
-        map.addListener(listener).join();
-        map.put("foo", value3).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
-        assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
-        listener.clearEvent();
+        map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.UPDATE, event.type());
+        assertTrue(Arrays.equals(value3, event.newValue().value()));
 
         // perform a non-state changing operation and verify no events are received.
         map.putIfAbsent("foo", value1).join();
-        assertNull(listener.event());
+        assertFalse(listener.eventReceived());
 
         // verify REMOVE events are received correctly.
         map.remove("foo").join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.REMOVE, listener.event().type());
-        assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
-        listener.clearEvent();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.REMOVE, event.type());
+        assertTrue(Arrays.equals(value3, event.oldValue().value()));
 
         // verify compute methods also generate events.
         map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.INSERT, listener.event().type());
-        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
-        listener.clearEvent();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.INSERT, event.type());
+        assertTrue(Arrays.equals(value1, event.newValue().value()));
 
         map.compute("foo", (k, v) -> value2).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
-        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
-        listener.clearEvent();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.UPDATE, event.type());
+        assertTrue(Arrays.equals(value2, event.newValue().value()));
 
         map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.REMOVE, listener.event().type());
-        assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
-        listener.clearEvent();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.REMOVE, event.type());
+        assertTrue(Arrays.equals(value2, event.oldValue().value()));
 
         map.removeListener(listener).join();
     }
@@ -359,7 +357,7 @@
         map.prepare(tx).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
-        assertNull(listener.event());
+        assertFalse(listener.eventReceived());
 
         map.size().thenAccept(result -> {
             assertTrue(result == 0);
@@ -376,21 +374,21 @@
             assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
         }
 
-        assertNull(listener.event());
+        assertFalse(listener.eventReceived());
 
         map.commit(tx.transactionId()).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.INSERT, listener.event().type());
-        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
-        listener.clearEvent();
+        MapEvent<String, byte[]> event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.INSERT, event.type());
+        assertTrue(Arrays.equals(value1, event.newValue().value()));
 
         map.put("foo", value2).thenAccept(result -> {
             assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
         }).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
-        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
-        listener.clearEvent();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.UPDATE, event.type());
+        assertTrue(Arrays.equals(value2, event.newValue().value()));
     }
 
     protected void transactionRollbackTests(int clusterSize) throws Throwable {
@@ -412,10 +410,10 @@
         map.prepare(tx).thenAccept(result -> {
             assertEquals(true, result);
         }).join();
-        assertNull(listener.event());
+        assertFalse(listener.eventReceived());
 
         map.rollback(tx.transactionId()).join();
-        assertNull(listener.event());
+        assertFalse(listener.eventReceived());
 
         map.get("foo").thenAccept(result -> {
             assertNull(result);
@@ -424,27 +422,31 @@
         map.put("foo", value2).thenAccept(result -> {
             assertNull(result);
         }).join();
-        assertNotNull(listener.event());
-        assertEquals(MapEvent.Type.INSERT, listener.event().type());
-        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
-        listener.clearEvent();
+        MapEvent<String, byte[]> event = listener.event();
+        assertNotNull(event);
+        assertEquals(MapEvent.Type.INSERT, event.type());
+        assertTrue(Arrays.equals(value2, event.newValue().value()));
     }
 
     private static class TestMapEventListener implements MapEventListener<String, byte[]> {
 
-        MapEvent<String, byte[]> event;
+        private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
 
         @Override
         public void event(MapEvent<String, byte[]> event) {
-            this.event = event;
+            try {
+                queue.put(event);
+            } catch (InterruptedException e) {
+                Throwables.propagate(e);
+            }
         }
 
-        public MapEvent<String, byte[]> event() {
-            return event;
+        public boolean eventReceived() {
+            return !queue.isEmpty();
         }
 
-        public void clearEvent() {
-            event = null;
+        public MapEvent<String, byte[]> event() throws InterruptedException {
+            return queue.take();
         }
     }
 }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
index 2b1f56b..64dbb40 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -20,7 +20,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -35,7 +34,6 @@
 /**
  * Unit tests for {@link AtomixLeaderElector}.
  */
-@Ignore
 public class AtomixLeaderElectorTest extends AtomixTestBase {
 
     NodeId node1 = new NodeId("node1");
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
index 0d5a89a..d38400d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
@@ -17,7 +17,6 @@
 
 import static org.junit.Assert.*;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import io.atomix.Atomix;
@@ -27,7 +26,6 @@
 /**
  * Unit tests for {@link AtomixCounter}.
  */
-@Ignore
 public class AtomixLongTest extends AtomixTestBase {
 
     @Override