State machine implementations for various distributed primitives based on latest Copycat APIs
Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
new file mode 100644
index 0000000..445dda4
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -0,0 +1,447 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.resource.ResourceType;
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests for {@link AtomixConsistentMap}.
+ */
+public class AtomixConsistentMapTest extends AtomixTestBase {
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(AtomixConsistentMap.class);
+ }
+
+ /**
+ * Tests various basic map operations.
+ */
+ @Test
+ public void testBasicMapOperations() throws Throwable {
+ basicMapOperationTests(1);
+ clearTests();
+ basicMapOperationTests(2);
+ clearTests();
+ basicMapOperationTests(3);
+ }
+
+ /**
+ * Tests various map compute* operations on different cluster sizes.
+ */
+ @Test
+ public void testMapComputeOperations() throws Throwable {
+ mapComputeOperationTests(1);
+ clearTests();
+ mapComputeOperationTests(2);
+ clearTests();
+ mapComputeOperationTests(3);
+ }
+
+ /**
+ * Tests map event notifications.
+ */
+ @Test
+ public void testMapListeners() throws Throwable {
+ mapListenerTests(1);
+ clearTests();
+ mapListenerTests(2);
+ clearTests();
+ mapListenerTests(3);
+ }
+
+ /**
+ * Tests map transaction commit.
+ */
+ @Test
+ public void testTransactionCommit() throws Throwable {
+ transactionCommitTests(1);
+ clearTests();
+ transactionCommitTests(2);
+ clearTests();
+ transactionCommitTests(3);
+ }
+
+ /**
+ * Tests map transaction rollback.
+ */
+ @Test
+ public void testTransactionRollback() throws Throwable {
+ transactionRollbackTests(1);
+ clearTests();
+ transactionRollbackTests(2);
+ clearTests();
+ transactionRollbackTests(3);
+ }
+
+ protected void basicMapOperationTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+
+ final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
+ final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+ map.isEmpty().thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.put("foo", rawFooValue).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 1);
+ }).join();
+
+ map.isEmpty().thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
+ assertNotNull(result);
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+ }).join();
+
+ map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 2);
+ }).join();
+
+ map.keySet().thenAccept(result -> {
+ assertTrue(result.size() == 2);
+ assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
+ }).join();
+
+ map.values().thenAccept(result -> {
+ assertTrue(result.size() == 2);
+ List<String> rawValues =
+ result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
+ assertTrue(rawValues.contains("Hello foo!"));
+ assertTrue(rawValues.contains("Hello bar!"));
+ }).join();
+
+ map.entrySet().thenAccept(result -> {
+ assertTrue(result.size() == 2);
+ // TODO: check entries
+ }).join();
+
+ map.get("foo").thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+ }).join();
+
+ map.remove("foo").thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+ }).join();
+
+ map.containsKey("foo").thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.get("foo").thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.get("bar").thenAccept(result -> {
+ assertNotNull(result);
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+ }).join();
+
+ map.containsKey("bar").thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 1);
+ }).join();
+
+ map.containsValue(rawBarValue).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.containsValue(rawFooValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
+ assertNotNull(result);
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+ }).join();
+
+ map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ // try replace_if_value_match for a non-existent key
+ map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ Versioned<byte[]> barValue = map.get("bar").join();
+ map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.clear().join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 0);
+ }).join();
+ }
+
+ public void mapComputeOperationTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+ final byte[] value3 = Tools.getBytesUtf8("value3");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+ map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+
+ map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+
+ map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
+ assertNull(result);
+ });
+
+ map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
+ }).join();
+
+ map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+
+ map.compute("foo", (k, v) -> value2).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
+ }).join();
+ }
+
+
+ protected void mapListenerTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+ final byte[] value3 = Tools.getBytesUtf8("value3");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+ TestMapEventListener listener = new TestMapEventListener();
+
+ // add listener; insert new value into map and verify an INSERT event is received.
+ map.addListener(listener).join();
+ map.put("foo", value1).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ // remove listener and verify listener is not notified.
+ map.removeListener(listener).join();
+ map.put("foo", value2).join();
+ assertNull(listener.event());
+
+ // add the listener back and verify UPDATE events are received correctly
+ map.addListener(listener).join();
+ map.put("foo", value3).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+ assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ // perform a non-state changing operation and verify no events are received.
+ map.putIfAbsent("foo", value1).join();
+ assertNull(listener.event());
+
+ // verify REMOVE events are received correctly.
+ map.remove("foo").join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+ assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
+ listener.clearEvent();
+
+ // verify compute methods also generate events.
+ map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ map.compute("foo", (k, v) -> value2).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
+ listener.clearEvent();
+
+ map.removeListener(listener).join();
+ }
+
+ protected void transactionCommitTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+ TestMapEventListener listener = new TestMapEventListener();
+
+ map.addListener(listener).join();
+
+ MapUpdate<String, byte[]> update1 =
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey("foo")
+ .withValue(value1)
+ .build();
+
+ TransactionalMapUpdate<String, byte[]> txMapUpdate =
+ new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+
+ map.prepare(txMapUpdate).thenAccept(result -> {
+ assertEquals(PrepareResult.OK, result);
+ }).join();
+ assertNull(listener.event());
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 0);
+ }).join();
+
+ map.get("foo").thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ try {
+ map.put("foo", value2).join();
+ assertTrue(false);
+ } catch (CompletionException e) {
+ assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
+ }
+
+ assertNull(listener.event());
+
+ map.commit(txMapUpdate.transactionId()).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ map.put("foo", value2).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+ listener.clearEvent();
+ }
+
+ protected void transactionRollbackTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+ TestMapEventListener listener = new TestMapEventListener();
+
+ map.addListener(listener).join();
+
+ MapUpdate<String, byte[]> update1 =
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey("foo")
+ .withValue(value1)
+ .build();
+ TransactionalMapUpdate<String, byte[]> txMapUpdate =
+ new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+ map.prepare(txMapUpdate).thenAccept(result -> {
+ assertEquals(PrepareResult.OK, result);
+ }).join();
+ assertNull(listener.event());
+
+ map.rollback(txMapUpdate.transactionId()).join();
+ assertNull(listener.event());
+
+ map.get("foo").thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.put("foo", value2).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+ listener.clearEvent();
+ }
+
+ private static class TestMapEventListener implements MapEventListener<String, byte[]> {
+
+ MapEvent<String, byte[]> event;
+
+ @Override
+ public void event(MapEvent<String, byte[]> event) {
+ this.event = event;
+ }
+
+ public MapEvent<String, byte[]> event() {
+ return event;
+ }
+
+ public void clearEvent() {
+ event = null;
+ }
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
new file mode 100644
index 0000000..7d77d03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+
+/**
+ * Unit tests for {@link AtomixLeaderElector}.
+ */
+public class AtomixLeaderElectorTest extends AtomixTestBase {
+
+ NodeId node1 = new NodeId("node1");
+ NodeId node2 = new NodeId("node2");
+ NodeId node3 = new NodeId("node3");
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(AtomixLeaderElector.class);
+ }
+
+ @Test
+ public void testRun() throws Throwable {
+ leaderElectorRunTests(1);
+ clearTests();
+// leaderElectorRunTests(2);
+// clearTests();
+// leaderElectorRunTests(3);
+// clearTests();
+ }
+
+ private void leaderElectorRunTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(1, result.leader().term());
+ assertEquals(1, result.candidates().size());
+ assertEquals(node1, result.candidates().get(0));
+ }).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ elector2.run("foo", node2).thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(1, result.leader().term());
+ assertEquals(2, result.candidates().size());
+ assertEquals(node1, result.candidates().get(0));
+ assertEquals(node2, result.candidates().get(1));
+ }).join();
+ }
+
+ @Test
+ public void testWithdraw() throws Throwable {
+ leaderElectorWithdrawTests(1);
+ clearTests();
+ leaderElectorWithdrawTests(2);
+ clearTests();
+ leaderElectorWithdrawTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorWithdrawTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ elector2.run("foo", node2).join();
+
+ LeaderEventListener listener1 = new LeaderEventListener();
+ elector1.addChangeListener(listener1).join();
+
+ LeaderEventListener listener2 = new LeaderEventListener();
+ elector2.addChangeListener(listener2).join();
+
+ elector1.withdraw("foo").join();
+
+ listener1.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().leader().term());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node2, result.newValue().candidates().get(0));
+ }).join();
+
+ listener2.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().leader().term());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node2, result.newValue().candidates().get(0));
+ }).join();
+ }
+
+ @Test
+ public void testAnoint() throws Throwable {
+ leaderElectorAnointTests(1);
+ clearTests();
+ leaderElectorAnointTests(2);
+ clearTests();
+ leaderElectorAnointTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorAnointTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ Atomix client3 = createAtomixClient();
+ AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ elector2.run("foo", node2).join();
+
+ LeaderEventListener listener1 = new LeaderEventListener();
+ elector1.addChangeListener(listener1).join();
+ LeaderEventListener listener2 = new LeaderEventListener();
+ elector2.addChangeListener(listener2);
+ LeaderEventListener listener3 = new LeaderEventListener();
+ elector3.addChangeListener(listener3).join();
+
+ elector3.anoint("foo", node3).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+ assertFalse(listener1.hasEvent());
+ assertFalse(listener2.hasEvent());
+ assertFalse(listener3.hasEvent());
+
+ elector3.anoint("foo", node2).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+ assertTrue(listener1.hasEvent());
+ assertTrue(listener2.hasEvent());
+ assertTrue(listener3.hasEvent());
+
+ listener1.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ assertEquals(node2, result.newValue().candidates().get(1));
+ }).join();
+ listener2.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ assertEquals(node2, result.newValue().candidates().get(1));
+ }).join();
+ listener3.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ assertEquals(node2, result.newValue().candidates().get(1));
+ }).join();
+ }
+
+ @Test
+ public void testLeaderSessionClose() throws Throwable {
+ leaderElectorLeaderSessionCloseTests(1);
+ clearTests();
+ leaderElectorLeaderSessionCloseTests(2);
+ clearTests();
+ leaderElectorLeaderSessionCloseTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ LeaderEventListener listener = new LeaderEventListener();
+ elector2.run("foo", node2).join();
+ elector2.addChangeListener(listener).join();
+ client1.close();
+ listener.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node2, result.newValue().candidates().get(0));
+ }).join();
+ }
+
+ @Test
+ public void testNonLeaderSessionClose() throws Throwable {
+ leaderElectorNonLeaderSessionCloseTests(1);
+ clearTests();
+ leaderElectorNonLeaderSessionCloseTests(2);
+ clearTests();
+ leaderElectorNonLeaderSessionCloseTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ LeaderEventListener listener = new LeaderEventListener();
+ elector2.run("foo", node2).join();
+ elector1.addChangeListener(listener).join();
+ client2.close().join();
+ listener.nextEvent().thenAccept(result -> {
+ assertEquals(node1, result.newValue().leaderNodeId());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ }).join();
+ }
+
+ @Test
+ public void testQueries() throws Throwable {
+ leaderElectorQueryTests(1);
+ clearTests();
+ leaderElectorQueryTests(2);
+ clearTests();
+ leaderElectorQueryTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorQueryTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ elector2.run("foo", node2).join();
+ elector2.run("bar", node2).join();
+ elector1.getElectedTopics(node1).thenAccept(result -> {
+ assertEquals(1, result.size());
+ assertTrue(result.contains("foo"));
+ }).join();
+ elector2.getElectedTopics(node1).thenAccept(result -> {
+ assertEquals(1, result.size());
+ assertTrue(result.contains("foo"));
+ }).join();
+ elector1.getLeadership("foo").thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(node1, result.candidates().get(0));
+ assertEquals(node2, result.candidates().get(1));
+ }).join();
+ elector2.getLeadership("foo").thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(node1, result.candidates().get(0));
+ assertEquals(node2, result.candidates().get(1));
+ }).join();
+ elector1.getLeadership("bar").thenAccept(result -> {
+ assertEquals(node2, result.leaderNodeId());
+ assertEquals(node2, result.candidates().get(0));
+ }).join();
+ elector2.getLeadership("bar").thenAccept(result -> {
+ assertEquals(node2, result.leaderNodeId());
+ assertEquals(node2, result.candidates().get(0));
+ }).join();
+ elector1.getLeaderships().thenAccept(result -> {
+ assertEquals(2, result.size());
+ Leadership fooLeadership = result.get("foo");
+ assertEquals(node1, fooLeadership.leaderNodeId());
+ assertEquals(node1, fooLeadership.candidates().get(0));
+ assertEquals(node2, fooLeadership.candidates().get(1));
+ Leadership barLeadership = result.get("bar");
+ assertEquals(node2, barLeadership.leaderNodeId());
+ assertEquals(node2, barLeadership.candidates().get(0));
+ }).join();
+ elector2.getLeaderships().thenAccept(result -> {
+ assertEquals(2, result.size());
+ Leadership fooLeadership = result.get("foo");
+ assertEquals(node1, fooLeadership.leaderNodeId());
+ assertEquals(node1, fooLeadership.candidates().get(0));
+ assertEquals(node2, fooLeadership.candidates().get(1));
+ Leadership barLeadership = result.get("bar");
+ assertEquals(node2, barLeadership.leaderNodeId());
+ assertEquals(node2, barLeadership.candidates().get(0));
+ }).join();
+ }
+
+ private static class LeaderEventListener implements Consumer<Change<Leadership>> {
+ Queue<Change<Leadership>> eventQueue = new LinkedList<>();
+ CompletableFuture<Change<Leadership>> pendingFuture;
+
+ @Override
+ public void accept(Change<Leadership> change) {
+ synchronized (this) {
+ if (pendingFuture != null) {
+ pendingFuture.complete(change);
+ pendingFuture = null;
+ } else {
+ eventQueue.add(change);
+ }
+ }
+ }
+
+ public boolean hasEvent() {
+ return !eventQueue.isEmpty();
+ }
+
+ public CompletableFuture<Change<Leadership>> nextEvent() {
+ synchronized (this) {
+ if (eventQueue.isEmpty()) {
+ if (pendingFuture == null) {
+ pendingFuture = new CompletableFuture<>();
+ }
+ return pendingFuture;
+ } else {
+ return CompletableFuture.completedFuture(eventQueue.poll());
+ }
+ }
+ }
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
new file mode 100644
index 0000000..d38400d
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+import io.atomix.variables.DistributedLong;
+
+/**
+ * Unit tests for {@link AtomixCounter}.
+ */
+public class AtomixLongTest extends AtomixTestBase {
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(DistributedLong.class);
+ }
+
+ @Test
+ public void testBasicOperations() throws Throwable {
+ basicOperationsTest(1);
+ clearTests();
+ basicOperationsTest(2);
+ clearTests();
+ basicOperationsTest(3);
+ clearTests();
+ }
+
+ protected void basicOperationsTest(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ Atomix atomix = createAtomixClient();
+ AtomixCounter along = new AtomixCounter("test-long", atomix.getLong("test-long").join());
+ assertEquals(0, along.get().join().longValue());
+ assertEquals(1, along.incrementAndGet().join().longValue());
+ along.set(100).join();
+ assertEquals(100, along.get().join().longValue());
+ assertEquals(100, along.getAndAdd(10).join().longValue());
+ assertEquals(110, along.get().join().longValue());
+ assertFalse(along.compareAndSet(109, 111).join());
+ assertTrue(along.compareAndSet(110, 111).join());
+ assertEquals(100, along.addAndGet(-11).join().longValue());
+ assertEquals(100, along.getAndIncrement().join().longValue());
+ assertEquals(101, along.get().join().longValue());
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
new file mode 100644
index 0000000..d655d52
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.LocalServerRegistry;
+import io.atomix.catalyst.transport.LocalTransport;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.server.CopycatServer;
+import io.atomix.copycat.server.storage.Storage;
+import io.atomix.copycat.server.storage.StorageLevel;
+import io.atomix.manager.state.ResourceManagerState;
+import io.atomix.resource.ResourceRegistry;
+import io.atomix.resource.ResourceType;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.onosproject.store.primitives.impl.CatalystSerializers;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Base class for various Atomix* tests.
+ */
+public abstract class AtomixTestBase {
+ private static final File TEST_DIR = new File("target/test-logs");
+ protected LocalServerRegistry registry;
+ protected int port;
+ protected List<Address> members;
+ protected List<CopycatClient> copycatClients = new ArrayList<>();
+ protected List<CopycatServer> copycatServers = new ArrayList<>();
+ protected List<Atomix> atomixClients = new ArrayList<>();
+ protected List<CopycatServer> atomixServers = new ArrayList<>();
+ protected Serializer serializer = CatalystSerializers.getSerializer();
+
+ /**
+ * Creates a new resource state machine.
+ *
+ * @return A new resource state machine.
+ */
+ protected abstract ResourceType resourceType();
+
+ /**
+ * Returns the next server address.
+ *
+ * @return The next server address.
+ */
+ private Address nextAddress() {
+ Address address = new Address("localhost", port++);
+ members.add(address);
+ return address;
+ }
+
+ /**
+ * Creates a set of Copycat servers.
+ */
+ protected List<CopycatServer> createCopycatServers(int nodes) throws Throwable {
+ CountDownLatch latch = new CountDownLatch(nodes);
+ List<CopycatServer> servers = new ArrayList<>();
+
+ List<Address> members = new ArrayList<>();
+ for (int i = 0; i < nodes; i++) {
+ members.add(nextAddress());
+ }
+
+ for (int i = 0; i < nodes; i++) {
+ CopycatServer server = createCopycatServer(members.get(i));
+ server.open().thenRun(latch::countDown);
+ servers.add(server);
+ }
+
+ Uninterruptibles.awaitUninterruptibly(latch);
+
+ return servers;
+ }
+
+ /**
+ * Creates a Copycat server.
+ */
+ protected CopycatServer createCopycatServer(Address address) {
+ ResourceRegistry resourceRegistry = new ResourceRegistry();
+ resourceRegistry.register(resourceType());
+ CopycatServer server = CopycatServer.builder(address, members)
+ .withTransport(new LocalTransport(registry))
+ .withStorage(Storage.builder()
+ .withStorageLevel(StorageLevel.DISK)
+ .withDirectory(TEST_DIR + "/" + address.port())
+ .withSerializer(serializer.clone())
+ .build())
+ .withStateMachine(() -> new ResourceManagerState(resourceRegistry))
+ .withSerializer(serializer.clone())
+ .withHeartbeatInterval(Duration.ofMillis(25))
+ .withElectionTimeout(Duration.ofMillis(50))
+ .withSessionTimeout(Duration.ofMillis(100))
+ .build();
+ copycatServers.add(server);
+ return server;
+ }
+
+ @Before
+ @After
+ public void clearTests() throws Exception {
+ registry = new LocalServerRegistry();
+ members = new ArrayList<>();
+ port = 5000;
+
+ CompletableFuture<Void> closeClients =
+ CompletableFuture.allOf(atomixClients.stream()
+ .map(Atomix::close)
+ .toArray(CompletableFuture[]::new));
+
+ closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
+ .map(CopycatServer::close)
+ .toArray(CompletableFuture[]::new))).join();
+
+ deleteDirectory(TEST_DIR);
+
+ atomixClients = new ArrayList<>();
+
+ copycatServers = new ArrayList<>();
+ }
+
+ /**
+ * Deletes a directory recursively.
+ */
+ private void deleteDirectory(File directory) throws IOException {
+ if (directory.exists()) {
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectory(file);
+ } else {
+ Files.delete(file.toPath());
+ }
+ }
+ }
+ Files.delete(directory.toPath());
+ }
+ }
+
+ /**
+ * Creates a Atomix client.
+ */
+ protected Atomix createAtomixClient() {
+ CountDownLatch latch = new CountDownLatch(1);
+ Atomix client = AtomixClient.builder(members)
+ .withTransport(new LocalTransport(registry))
+ .withSerializer(serializer.clone())
+ .withResourceResolver(r -> r.register(resourceType()))
+ .build();
+ client.open().thenRun(latch::countDown);
+ atomixClients.add(client);
+ Uninterruptibles.awaitUninterruptibly(latch);
+ return client;
+ }
+}