State machine implementations for various distributed primitives based on latest Copycat APIs

Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
new file mode 100644
index 0000000..7d77d03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+
+/**
+ * Unit tests for {@link AtomixLeaderElector}.
+ */
+public class AtomixLeaderElectorTest extends AtomixTestBase {
+
+    NodeId node1 = new NodeId("node1");
+    NodeId node2 = new NodeId("node2");
+    NodeId node3 = new NodeId("node3");
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(AtomixLeaderElector.class);
+    }
+
+    @Test
+    public void testRun() throws Throwable {
+        leaderElectorRunTests(1);
+        clearTests();
+//        leaderElectorRunTests(2);
+//        clearTests();
+//        leaderElectorRunTests(3);
+//        clearTests();
+    }
+
+    private void leaderElectorRunTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(1, result.leader().term());
+            assertEquals(1, result.candidates().size());
+            assertEquals(node1, result.candidates().get(0));
+        }).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector2.run("foo", node2).thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(1, result.leader().term());
+            assertEquals(2, result.candidates().size());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+    }
+
+    @Test
+    public void testWithdraw() throws Throwable {
+        leaderElectorWithdrawTests(1);
+        clearTests();
+        leaderElectorWithdrawTests(2);
+        clearTests();
+        leaderElectorWithdrawTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorWithdrawTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector2.run("foo", node2).join();
+
+        LeaderEventListener listener1 = new LeaderEventListener();
+        elector1.addChangeListener(listener1).join();
+
+        LeaderEventListener listener2 = new LeaderEventListener();
+        elector2.addChangeListener(listener2).join();
+
+        elector1.withdraw("foo").join();
+
+        listener1.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().leader().term());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+
+        listener2.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().leader().term());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testAnoint() throws Throwable {
+        leaderElectorAnointTests(1);
+        clearTests();
+        leaderElectorAnointTests(2);
+        clearTests();
+        leaderElectorAnointTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorAnointTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client3 = createAtomixClient();
+        AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        elector2.run("foo", node2).join();
+
+        LeaderEventListener listener1 = new LeaderEventListener();
+        elector1.addChangeListener(listener1).join();
+        LeaderEventListener listener2 = new LeaderEventListener();
+        elector2.addChangeListener(listener2);
+        LeaderEventListener listener3 = new LeaderEventListener();
+        elector3.addChangeListener(listener3).join();
+
+        elector3.anoint("foo", node3).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+        assertFalse(listener1.hasEvent());
+        assertFalse(listener2.hasEvent());
+        assertFalse(listener3.hasEvent());
+
+        elector3.anoint("foo", node2).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+        assertTrue(listener1.hasEvent());
+        assertTrue(listener2.hasEvent());
+        assertTrue(listener3.hasEvent());
+
+        listener1.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+        listener2.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+        listener3.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+    }
+
+    @Test
+    public void testLeaderSessionClose() throws Throwable {
+        leaderElectorLeaderSessionCloseTests(1);
+        clearTests();
+        leaderElectorLeaderSessionCloseTests(2);
+        clearTests();
+        leaderElectorLeaderSessionCloseTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        LeaderEventListener listener = new LeaderEventListener();
+        elector2.run("foo", node2).join();
+        elector2.addChangeListener(listener).join();
+        client1.close();
+        listener.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testNonLeaderSessionClose() throws Throwable {
+        leaderElectorNonLeaderSessionCloseTests(1);
+        clearTests();
+        leaderElectorNonLeaderSessionCloseTests(2);
+        clearTests();
+        leaderElectorNonLeaderSessionCloseTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        LeaderEventListener listener = new LeaderEventListener();
+        elector2.run("foo", node2).join();
+        elector1.addChangeListener(listener).join();
+        client2.close().join();
+        listener.nextEvent().thenAccept(result -> {
+            assertEquals(node1, result.newValue().leaderNodeId());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testQueries() throws Throwable {
+        leaderElectorQueryTests(1);
+        clearTests();
+        leaderElectorQueryTests(2);
+        clearTests();
+        leaderElectorQueryTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorQueryTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        elector2.run("foo", node2).join();
+        elector2.run("bar", node2).join();
+        elector1.getElectedTopics(node1).thenAccept(result -> {
+            assertEquals(1, result.size());
+            assertTrue(result.contains("foo"));
+        }).join();
+        elector2.getElectedTopics(node1).thenAccept(result -> {
+            assertEquals(1, result.size());
+            assertTrue(result.contains("foo"));
+        }).join();
+        elector1.getLeadership("foo").thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+        elector2.getLeadership("foo").thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+        elector1.getLeadership("bar").thenAccept(result -> {
+            assertEquals(node2, result.leaderNodeId());
+            assertEquals(node2, result.candidates().get(0));
+        }).join();
+        elector2.getLeadership("bar").thenAccept(result -> {
+            assertEquals(node2, result.leaderNodeId());
+            assertEquals(node2, result.candidates().get(0));
+        }).join();
+        elector1.getLeaderships().thenAccept(result -> {
+            assertEquals(2, result.size());
+            Leadership fooLeadership = result.get("foo");
+            assertEquals(node1, fooLeadership.leaderNodeId());
+            assertEquals(node1, fooLeadership.candidates().get(0));
+            assertEquals(node2, fooLeadership.candidates().get(1));
+            Leadership barLeadership = result.get("bar");
+            assertEquals(node2, barLeadership.leaderNodeId());
+            assertEquals(node2, barLeadership.candidates().get(0));
+        }).join();
+        elector2.getLeaderships().thenAccept(result -> {
+            assertEquals(2, result.size());
+            Leadership fooLeadership = result.get("foo");
+            assertEquals(node1, fooLeadership.leaderNodeId());
+            assertEquals(node1, fooLeadership.candidates().get(0));
+            assertEquals(node2, fooLeadership.candidates().get(1));
+            Leadership barLeadership = result.get("bar");
+            assertEquals(node2, barLeadership.leaderNodeId());
+            assertEquals(node2, barLeadership.candidates().get(0));
+        }).join();
+    }
+
+    private static class LeaderEventListener implements Consumer<Change<Leadership>> {
+        Queue<Change<Leadership>> eventQueue = new LinkedList<>();
+        CompletableFuture<Change<Leadership>> pendingFuture;
+
+        @Override
+        public void accept(Change<Leadership> change) {
+            synchronized (this) {
+                if (pendingFuture != null) {
+                    pendingFuture.complete(change);
+                    pendingFuture = null;
+                } else {
+                    eventQueue.add(change);
+                }
+            }
+        }
+
+        public boolean hasEvent() {
+            return !eventQueue.isEmpty();
+        }
+
+        public CompletableFuture<Change<Leadership>> nextEvent() {
+            synchronized (this) {
+                if (eventQueue.isEmpty()) {
+                    if (pendingFuture == null) {
+                        pendingFuture = new CompletableFuture<>();
+                    }
+                    return pendingFuture;
+                } else {
+                    return CompletableFuture.completedFuture(eventQueue.poll());
+                }
+            }
+        }
+    }
+}