Default ConsistentMap consistency level to SEQUENTIAL and reenable all Atomix unit tests
Change-Id: Ic04ff81fbaaa7c007f20077391a72fdfa9fd382a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 4c065a5..14c24a6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -236,11 +236,8 @@
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
if (!mapEventListeners.isEmpty()) {
- if (mapEventListeners.add(listener)) {
- return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
- } else {
- return CompletableFuture.completedFuture(null);
- }
+ mapEventListeners.add(listener);
+ return CompletableFuture.completedFuture(null);
}
mapEventListeners.add(listener);
return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 4f912da..dbc3157 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -52,7 +52,7 @@
@Override
public ConsistencyLevel consistency() {
- return ConsistencyLevel.LINEARIZABLE;
+ return ConsistencyLevel.SEQUENTIAL;
}
@Override
@@ -78,7 +78,7 @@
@Override
public ConsistencyLevel consistency() {
- return ConsistencyLevel.BOUNDED_LINEARIZABLE;
+ return ConsistencyLevel.SEQUENTIAL;
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index a6e6ca0..6395f69 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -337,10 +337,9 @@
*
* @param commit unlisten commit
*/
- protected void unlisten(
- Commit<? extends Unlisten> commit) {
+ protected void unlisten(Commit<? extends Unlisten> commit) {
try {
- Commit<? extends Listen> listener = listeners.remove(commit.session());
+ Commit<? extends Listen> listener = listeners.remove(commit.session().id());
if (listener != null) {
listener.close();
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index 26aa9a8..150b99d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -21,10 +21,11 @@
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
-import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.MapUpdate;
@@ -34,12 +35,12 @@
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
/**
* Unit tests for {@link AtomixConsistentMap}.
*/
-@Ignore
public class AtomixConsistentMapTest extends AtomixTestBase {
@Override
@@ -285,55 +286,52 @@
TestMapEventListener listener = new TestMapEventListener();
// add listener; insert new value into map and verify an INSERT event is received.
- map.addListener(listener).join();
- map.put("foo", value1).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.INSERT, listener.event().type());
- assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
- listener.clearEvent();
+ map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
+ MapEvent<String, byte[]> event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.INSERT, event.type());
+ assertTrue(Arrays.equals(value1, event.newValue().value()));
// remove listener and verify listener is not notified.
- map.removeListener(listener).join();
- map.put("foo", value2).join();
- assertNull(listener.event());
+ map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
+ assertFalse(listener.eventReceived());
// add the listener back and verify UPDATE events are received correctly
- map.addListener(listener).join();
- map.put("foo", value3).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.UPDATE, listener.event().type());
- assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
- listener.clearEvent();
+ map.addListener(listener).thenCompose(v -> map.put("foo", value3)).join();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.UPDATE, event.type());
+ assertTrue(Arrays.equals(value3, event.newValue().value()));
// perform a non-state changing operation and verify no events are received.
map.putIfAbsent("foo", value1).join();
- assertNull(listener.event());
+ assertFalse(listener.eventReceived());
// verify REMOVE events are received correctly.
map.remove("foo").join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.REMOVE, listener.event().type());
- assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
- listener.clearEvent();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.REMOVE, event.type());
+ assertTrue(Arrays.equals(value3, event.oldValue().value()));
// verify compute methods also generate events.
map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.INSERT, listener.event().type());
- assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
- listener.clearEvent();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.INSERT, event.type());
+ assertTrue(Arrays.equals(value1, event.newValue().value()));
map.compute("foo", (k, v) -> value2).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.UPDATE, listener.event().type());
- assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
- listener.clearEvent();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.UPDATE, event.type());
+ assertTrue(Arrays.equals(value2, event.newValue().value()));
map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.REMOVE, listener.event().type());
- assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
- listener.clearEvent();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.REMOVE, event.type());
+ assertTrue(Arrays.equals(value2, event.oldValue().value()));
map.removeListener(listener).join();
}
@@ -359,7 +357,7 @@
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
}).join();
- assertNull(listener.event());
+ assertFalse(listener.eventReceived());
map.size().thenAccept(result -> {
assertTrue(result == 0);
@@ -376,21 +374,21 @@
assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
}
- assertNull(listener.event());
+ assertFalse(listener.eventReceived());
map.commit(tx.transactionId()).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.INSERT, listener.event().type());
- assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
- listener.clearEvent();
+ MapEvent<String, byte[]> event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.INSERT, event.type());
+ assertTrue(Arrays.equals(value1, event.newValue().value()));
map.put("foo", value2).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.UPDATE, listener.event().type());
- assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
- listener.clearEvent();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.UPDATE, event.type());
+ assertTrue(Arrays.equals(value2, event.newValue().value()));
}
protected void transactionRollbackTests(int clusterSize) throws Throwable {
@@ -412,10 +410,10 @@
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
}).join();
- assertNull(listener.event());
+ assertFalse(listener.eventReceived());
map.rollback(tx.transactionId()).join();
- assertNull(listener.event());
+ assertFalse(listener.eventReceived());
map.get("foo").thenAccept(result -> {
assertNull(result);
@@ -424,27 +422,31 @@
map.put("foo", value2).thenAccept(result -> {
assertNull(result);
}).join();
- assertNotNull(listener.event());
- assertEquals(MapEvent.Type.INSERT, listener.event().type());
- assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
- listener.clearEvent();
+ MapEvent<String, byte[]> event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.INSERT, event.type());
+ assertTrue(Arrays.equals(value2, event.newValue().value()));
}
private static class TestMapEventListener implements MapEventListener<String, byte[]> {
- MapEvent<String, byte[]> event;
+ private final BlockingQueue<MapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
@Override
public void event(MapEvent<String, byte[]> event) {
- this.event = event;
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ Throwables.propagate(e);
+ }
}
- public MapEvent<String, byte[]> event() {
- return event;
+ public boolean eventReceived() {
+ return !queue.isEmpty();
}
- public void clearEvent() {
- event = null;
+ public MapEvent<String, byte[]> event() throws InterruptedException {
+ return queue.take();
}
}
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
index 2b1f56b..64dbb40 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -20,7 +20,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -35,7 +34,6 @@
/**
* Unit tests for {@link AtomixLeaderElector}.
*/
-@Ignore
public class AtomixLeaderElectorTest extends AtomixTestBase {
NodeId node1 = new NodeId("node1");
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
index 0d5a89a..d38400d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
@@ -17,7 +17,6 @@
import static org.junit.Assert.*;
-import org.junit.Ignore;
import org.junit.Test;
import io.atomix.Atomix;
@@ -27,7 +26,6 @@
/**
* Unit tests for {@link AtomixCounter}.
*/
-@Ignore
public class AtomixLongTest extends AtomixTestBase {
@Override