State machine implementations for various distributed primitives based on latest Copycat APIs

Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
new file mode 100644
index 0000000..445dda4
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -0,0 +1,447 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.resource.ResourceType;
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests for {@link AtomixConsistentMap}.
+ */
+public class AtomixConsistentMapTest extends AtomixTestBase {
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(AtomixConsistentMap.class);
+    }
+
+    /**
+     * Tests various basic map operations.
+     */
+    @Test
+    public void testBasicMapOperations() throws Throwable {
+        basicMapOperationTests(1);
+        clearTests();
+        basicMapOperationTests(2);
+        clearTests();
+        basicMapOperationTests(3);
+    }
+
+    /**
+     * Tests various map compute* operations on different cluster sizes.
+     */
+    @Test
+    public void testMapComputeOperations() throws Throwable {
+        mapComputeOperationTests(1);
+        clearTests();
+        mapComputeOperationTests(2);
+        clearTests();
+        mapComputeOperationTests(3);
+    }
+
+    /**
+     * Tests map event notifications.
+     */
+    @Test
+    public void testMapListeners() throws Throwable {
+        mapListenerTests(1);
+        clearTests();
+        mapListenerTests(2);
+        clearTests();
+        mapListenerTests(3);
+    }
+
+    /**
+     * Tests map transaction commit.
+     */
+    @Test
+    public void testTransactionCommit() throws Throwable {
+        transactionCommitTests(1);
+        clearTests();
+        transactionCommitTests(2);
+        clearTests();
+        transactionCommitTests(3);
+    }
+
+    /**
+     * Tests map transaction rollback.
+     */
+    @Test
+    public void testTransactionRollback() throws Throwable {
+        transactionRollbackTests(1);
+        clearTests();
+        transactionRollbackTests(2);
+        clearTests();
+        transactionRollbackTests(3);
+    }
+
+    protected void basicMapOperationTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+
+        final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
+        final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+        map.isEmpty().thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.put("foo", rawFooValue).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 1);
+        }).join();
+
+        map.isEmpty().thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
+            assertNotNull(result);
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+        }).join();
+
+        map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 2);
+        }).join();
+
+        map.keySet().thenAccept(result -> {
+            assertTrue(result.size() == 2);
+            assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
+        }).join();
+
+        map.values().thenAccept(result -> {
+            assertTrue(result.size() == 2);
+            List<String> rawValues =
+                    result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
+            assertTrue(rawValues.contains("Hello foo!"));
+            assertTrue(rawValues.contains("Hello bar!"));
+        }).join();
+
+        map.entrySet().thenAccept(result -> {
+            assertTrue(result.size() == 2);
+            // TODO: check entries
+        }).join();
+
+        map.get("foo").thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+        }).join();
+
+        map.remove("foo").thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+        }).join();
+
+        map.containsKey("foo").thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.get("foo").thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.get("bar").thenAccept(result -> {
+            assertNotNull(result);
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+        }).join();
+
+        map.containsKey("bar").thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 1);
+        }).join();
+
+        map.containsValue(rawBarValue).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.containsValue(rawFooValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
+            assertNotNull(result);
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+        }).join();
+
+        map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        // try replace_if_value_match for a non-existent key
+        map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        Versioned<byte[]> barValue = map.get("bar").join();
+        map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.clear().join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 0);
+        }).join();
+    }
+
+    public void mapComputeOperationTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+        final byte[] value3 = Tools.getBytesUtf8("value3");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+        map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+
+        map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+
+        map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
+            assertNull(result);
+        });
+
+        map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
+        }).join();
+
+        map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+
+        map.compute("foo", (k, v) -> value2).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
+        }).join();
+    }
+
+
+    protected void mapListenerTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+        final byte[] value3 = Tools.getBytesUtf8("value3");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+        TestMapEventListener listener = new TestMapEventListener();
+
+        // add listener; insert new value into map and verify an INSERT event is received.
+        map.addListener(listener).join();
+        map.put("foo", value1).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        // remove listener and verify listener is not notified.
+        map.removeListener(listener).join();
+        map.put("foo", value2).join();
+        assertNull(listener.event());
+
+        // add the listener back and verify UPDATE events are received correctly
+        map.addListener(listener).join();
+        map.put("foo", value3).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+        assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        // perform a non-state changing operation and verify no events are received.
+        map.putIfAbsent("foo", value1).join();
+        assertNull(listener.event());
+
+        // verify REMOVE events are received correctly.
+        map.remove("foo").join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+        assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
+        listener.clearEvent();
+
+        // verify compute methods also generate events.
+        map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        map.compute("foo", (k, v) -> value2).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
+        listener.clearEvent();
+
+        map.removeListener(listener).join();
+    }
+
+    protected void transactionCommitTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+        TestMapEventListener listener = new TestMapEventListener();
+
+        map.addListener(listener).join();
+
+        MapUpdate<String, byte[]> update1 =
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+                .withKey("foo")
+                .withValue(value1)
+                .build();
+
+        TransactionalMapUpdate<String, byte[]> txMapUpdate =
+                new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+
+        map.prepare(txMapUpdate).thenAccept(result -> {
+            assertEquals(PrepareResult.OK, result);
+        }).join();
+        assertNull(listener.event());
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 0);
+        }).join();
+
+        map.get("foo").thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        try {
+            map.put("foo", value2).join();
+            assertTrue(false);
+        } catch (CompletionException e) {
+            assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
+        }
+
+        assertNull(listener.event());
+
+        map.commit(txMapUpdate.transactionId()).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        map.put("foo", value2).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+        listener.clearEvent();
+    }
+
+    protected void transactionRollbackTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+        TestMapEventListener listener = new TestMapEventListener();
+
+        map.addListener(listener).join();
+
+        MapUpdate<String, byte[]> update1 =
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+                .withKey("foo")
+                .withValue(value1)
+                .build();
+        TransactionalMapUpdate<String, byte[]> txMapUpdate =
+                new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+        map.prepare(txMapUpdate).thenAccept(result -> {
+            assertEquals(PrepareResult.OK, result);
+        }).join();
+        assertNull(listener.event());
+
+        map.rollback(txMapUpdate.transactionId()).join();
+        assertNull(listener.event());
+
+        map.get("foo").thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.put("foo", value2).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+        listener.clearEvent();
+    }
+
+    private static class TestMapEventListener implements MapEventListener<String, byte[]> {
+
+        MapEvent<String, byte[]> event;
+
+        @Override
+        public void event(MapEvent<String, byte[]> event) {
+            this.event = event;
+        }
+
+        public MapEvent<String, byte[]> event() {
+            return event;
+        }
+
+        public void clearEvent() {
+            event = null;
+        }
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
new file mode 100644
index 0000000..7d77d03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+
+/**
+ * Unit tests for {@link AtomixLeaderElector}.
+ */
+public class AtomixLeaderElectorTest extends AtomixTestBase {
+
+    NodeId node1 = new NodeId("node1");
+    NodeId node2 = new NodeId("node2");
+    NodeId node3 = new NodeId("node3");
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(AtomixLeaderElector.class);
+    }
+
+    @Test
+    public void testRun() throws Throwable {
+        leaderElectorRunTests(1);
+        clearTests();
+//        leaderElectorRunTests(2);
+//        clearTests();
+//        leaderElectorRunTests(3);
+//        clearTests();
+    }
+
+    private void leaderElectorRunTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(1, result.leader().term());
+            assertEquals(1, result.candidates().size());
+            assertEquals(node1, result.candidates().get(0));
+        }).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector2.run("foo", node2).thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(1, result.leader().term());
+            assertEquals(2, result.candidates().size());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+    }
+
+    @Test
+    public void testWithdraw() throws Throwable {
+        leaderElectorWithdrawTests(1);
+        clearTests();
+        leaderElectorWithdrawTests(2);
+        clearTests();
+        leaderElectorWithdrawTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorWithdrawTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector2.run("foo", node2).join();
+
+        LeaderEventListener listener1 = new LeaderEventListener();
+        elector1.addChangeListener(listener1).join();
+
+        LeaderEventListener listener2 = new LeaderEventListener();
+        elector2.addChangeListener(listener2).join();
+
+        elector1.withdraw("foo").join();
+
+        listener1.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().leader().term());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+
+        listener2.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().leader().term());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testAnoint() throws Throwable {
+        leaderElectorAnointTests(1);
+        clearTests();
+        leaderElectorAnointTests(2);
+        clearTests();
+        leaderElectorAnointTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorAnointTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client3 = createAtomixClient();
+        AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        elector2.run("foo", node2).join();
+
+        LeaderEventListener listener1 = new LeaderEventListener();
+        elector1.addChangeListener(listener1).join();
+        LeaderEventListener listener2 = new LeaderEventListener();
+        elector2.addChangeListener(listener2);
+        LeaderEventListener listener3 = new LeaderEventListener();
+        elector3.addChangeListener(listener3).join();
+
+        elector3.anoint("foo", node3).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+        assertFalse(listener1.hasEvent());
+        assertFalse(listener2.hasEvent());
+        assertFalse(listener3.hasEvent());
+
+        elector3.anoint("foo", node2).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+        assertTrue(listener1.hasEvent());
+        assertTrue(listener2.hasEvent());
+        assertTrue(listener3.hasEvent());
+
+        listener1.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+        listener2.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+        listener3.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+    }
+
+    @Test
+    public void testLeaderSessionClose() throws Throwable {
+        leaderElectorLeaderSessionCloseTests(1);
+        clearTests();
+        leaderElectorLeaderSessionCloseTests(2);
+        clearTests();
+        leaderElectorLeaderSessionCloseTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        LeaderEventListener listener = new LeaderEventListener();
+        elector2.run("foo", node2).join();
+        elector2.addChangeListener(listener).join();
+        client1.close();
+        listener.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testNonLeaderSessionClose() throws Throwable {
+        leaderElectorNonLeaderSessionCloseTests(1);
+        clearTests();
+        leaderElectorNonLeaderSessionCloseTests(2);
+        clearTests();
+        leaderElectorNonLeaderSessionCloseTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        LeaderEventListener listener = new LeaderEventListener();
+        elector2.run("foo", node2).join();
+        elector1.addChangeListener(listener).join();
+        client2.close().join();
+        listener.nextEvent().thenAccept(result -> {
+            assertEquals(node1, result.newValue().leaderNodeId());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testQueries() throws Throwable {
+        leaderElectorQueryTests(1);
+        clearTests();
+        leaderElectorQueryTests(2);
+        clearTests();
+        leaderElectorQueryTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorQueryTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        elector2.run("foo", node2).join();
+        elector2.run("bar", node2).join();
+        elector1.getElectedTopics(node1).thenAccept(result -> {
+            assertEquals(1, result.size());
+            assertTrue(result.contains("foo"));
+        }).join();
+        elector2.getElectedTopics(node1).thenAccept(result -> {
+            assertEquals(1, result.size());
+            assertTrue(result.contains("foo"));
+        }).join();
+        elector1.getLeadership("foo").thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+        elector2.getLeadership("foo").thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+        elector1.getLeadership("bar").thenAccept(result -> {
+            assertEquals(node2, result.leaderNodeId());
+            assertEquals(node2, result.candidates().get(0));
+        }).join();
+        elector2.getLeadership("bar").thenAccept(result -> {
+            assertEquals(node2, result.leaderNodeId());
+            assertEquals(node2, result.candidates().get(0));
+        }).join();
+        elector1.getLeaderships().thenAccept(result -> {
+            assertEquals(2, result.size());
+            Leadership fooLeadership = result.get("foo");
+            assertEquals(node1, fooLeadership.leaderNodeId());
+            assertEquals(node1, fooLeadership.candidates().get(0));
+            assertEquals(node2, fooLeadership.candidates().get(1));
+            Leadership barLeadership = result.get("bar");
+            assertEquals(node2, barLeadership.leaderNodeId());
+            assertEquals(node2, barLeadership.candidates().get(0));
+        }).join();
+        elector2.getLeaderships().thenAccept(result -> {
+            assertEquals(2, result.size());
+            Leadership fooLeadership = result.get("foo");
+            assertEquals(node1, fooLeadership.leaderNodeId());
+            assertEquals(node1, fooLeadership.candidates().get(0));
+            assertEquals(node2, fooLeadership.candidates().get(1));
+            Leadership barLeadership = result.get("bar");
+            assertEquals(node2, barLeadership.leaderNodeId());
+            assertEquals(node2, barLeadership.candidates().get(0));
+        }).join();
+    }
+
+    private static class LeaderEventListener implements Consumer<Change<Leadership>> {
+        Queue<Change<Leadership>> eventQueue = new LinkedList<>();
+        CompletableFuture<Change<Leadership>> pendingFuture;
+
+        @Override
+        public void accept(Change<Leadership> change) {
+            synchronized (this) {
+                if (pendingFuture != null) {
+                    pendingFuture.complete(change);
+                    pendingFuture = null;
+                } else {
+                    eventQueue.add(change);
+                }
+            }
+        }
+
+        public boolean hasEvent() {
+            return !eventQueue.isEmpty();
+        }
+
+        public CompletableFuture<Change<Leadership>> nextEvent() {
+            synchronized (this) {
+                if (eventQueue.isEmpty()) {
+                    if (pendingFuture == null) {
+                        pendingFuture = new CompletableFuture<>();
+                    }
+                    return pendingFuture;
+                } else {
+                    return CompletableFuture.completedFuture(eventQueue.poll());
+                }
+            }
+        }
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
new file mode 100644
index 0000000..d38400d
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+import io.atomix.variables.DistributedLong;
+
+/**
+ * Unit tests for {@link AtomixCounter}.
+ */
+public class AtomixLongTest extends AtomixTestBase {
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(DistributedLong.class);
+    }
+
+    @Test
+    public void testBasicOperations() throws Throwable {
+        basicOperationsTest(1);
+        clearTests();
+        basicOperationsTest(2);
+        clearTests();
+        basicOperationsTest(3);
+        clearTests();
+    }
+
+    protected void basicOperationsTest(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        Atomix atomix = createAtomixClient();
+        AtomixCounter along = new AtomixCounter("test-long", atomix.getLong("test-long").join());
+        assertEquals(0, along.get().join().longValue());
+        assertEquals(1, along.incrementAndGet().join().longValue());
+        along.set(100).join();
+        assertEquals(100, along.get().join().longValue());
+        assertEquals(100, along.getAndAdd(10).join().longValue());
+        assertEquals(110, along.get().join().longValue());
+        assertFalse(along.compareAndSet(109, 111).join());
+        assertTrue(along.compareAndSet(110, 111).join());
+        assertEquals(100, along.addAndGet(-11).join().longValue());
+        assertEquals(100, along.getAndIncrement().join().longValue());
+        assertEquals(101, along.get().join().longValue());
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
new file mode 100644
index 0000000..d655d52
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.LocalServerRegistry;
+import io.atomix.catalyst.transport.LocalTransport;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.server.CopycatServer;
+import io.atomix.copycat.server.storage.Storage;
+import io.atomix.copycat.server.storage.StorageLevel;
+import io.atomix.manager.state.ResourceManagerState;
+import io.atomix.resource.ResourceRegistry;
+import io.atomix.resource.ResourceType;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.onosproject.store.primitives.impl.CatalystSerializers;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Base class for various Atomix* tests.
+ */
+public abstract class AtomixTestBase {
+    private static final File TEST_DIR = new File("target/test-logs");
+    protected LocalServerRegistry registry;
+    protected int port;
+    protected List<Address> members;
+    protected List<CopycatClient> copycatClients = new ArrayList<>();
+    protected List<CopycatServer> copycatServers = new ArrayList<>();
+    protected List<Atomix> atomixClients = new ArrayList<>();
+    protected List<CopycatServer> atomixServers = new ArrayList<>();
+    protected Serializer serializer = CatalystSerializers.getSerializer();
+
+    /**
+     * Creates a new resource state machine.
+     *
+     * @return A new resource state machine.
+     */
+    protected abstract ResourceType resourceType();
+
+    /**
+     * Returns the next server address.
+     *
+     * @return The next server address.
+     */
+    private Address nextAddress() {
+        Address address = new Address("localhost", port++);
+        members.add(address);
+        return address;
+    }
+
+    /**
+     * Creates a set of Copycat servers.
+     */
+    protected List<CopycatServer> createCopycatServers(int nodes) throws Throwable {
+        CountDownLatch latch = new CountDownLatch(nodes);
+        List<CopycatServer> servers = new ArrayList<>();
+
+        List<Address> members = new ArrayList<>();
+        for (int i = 0; i < nodes; i++) {
+            members.add(nextAddress());
+        }
+
+        for (int i = 0; i < nodes; i++) {
+            CopycatServer server = createCopycatServer(members.get(i));
+            server.open().thenRun(latch::countDown);
+            servers.add(server);
+        }
+
+        Uninterruptibles.awaitUninterruptibly(latch);
+
+        return servers;
+    }
+
+    /**
+     * Creates a Copycat server.
+     */
+    protected CopycatServer createCopycatServer(Address address) {
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        resourceRegistry.register(resourceType());
+        CopycatServer server = CopycatServer.builder(address, members)
+                .withTransport(new LocalTransport(registry))
+                .withStorage(Storage.builder()
+                        .withStorageLevel(StorageLevel.DISK)
+                        .withDirectory(TEST_DIR + "/" + address.port())
+                        .withSerializer(serializer.clone())
+                        .build())
+                .withStateMachine(() -> new ResourceManagerState(resourceRegistry))
+                .withSerializer(serializer.clone())
+                .withHeartbeatInterval(Duration.ofMillis(25))
+                .withElectionTimeout(Duration.ofMillis(50))
+                .withSessionTimeout(Duration.ofMillis(100))
+                .build();
+        copycatServers.add(server);
+        return server;
+    }
+
+    @Before
+    @After
+    public void clearTests() throws Exception {
+        registry = new LocalServerRegistry();
+        members = new ArrayList<>();
+        port = 5000;
+
+        CompletableFuture<Void> closeClients =
+                CompletableFuture.allOf(atomixClients.stream()
+                                                     .map(Atomix::close)
+                                                     .toArray(CompletableFuture[]::new));
+
+        closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
+                .map(CopycatServer::close)
+                .toArray(CompletableFuture[]::new))).join();
+
+        deleteDirectory(TEST_DIR);
+
+        atomixClients = new ArrayList<>();
+
+        copycatServers = new ArrayList<>();
+    }
+
+    /**
+     * Deletes a directory recursively.
+     */
+    private void deleteDirectory(File directory) throws IOException {
+        if (directory.exists()) {
+            File[] files = directory.listFiles();
+            if (files != null) {
+                for (File file : files) {
+                    if (file.isDirectory()) {
+                        deleteDirectory(file);
+                    } else {
+                        Files.delete(file.toPath());
+                    }
+                }
+            }
+            Files.delete(directory.toPath());
+        }
+    }
+
+    /**
+     * Creates a Atomix client.
+     */
+    protected Atomix createAtomixClient() {
+        CountDownLatch latch = new CountDownLatch(1);
+        Atomix client = AtomixClient.builder(members)
+                .withTransport(new LocalTransport(registry))
+                .withSerializer(serializer.clone())
+                .withResourceResolver(r -> r.register(resourceType()))
+                .build();
+        client.open().thenRun(latch::countDown);
+        atomixClients.add(client);
+        Uninterruptibles.awaitUninterruptibly(latch);
+        return client;
+    }
+}