/*
 * Copyright 2016 Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.primitives.resources.impl;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.junit.Test;

import static org.junit.Assert.*;

import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;

import io.atomix.Atomix;
import io.atomix.resource.ResourceType;

/**
 * Unit tests for {@link AtomixLeaderElector}.
 */
public class AtomixLeaderElectorTest extends AtomixTestBase {

    NodeId node1 = new NodeId("node1");
    NodeId node2 = new NodeId("node2");
    NodeId node3 = new NodeId("node3");

    @Override
    protected ResourceType resourceType() {
        return new ResourceType(AtomixLeaderElector.class);
    }

    @Test
    public void testRun() throws Throwable {
        leaderElectorRunTests(1);
        clearTests();
        leaderElectorRunTests(2);
        clearTests();
        leaderElectorRunTests(3);
        clearTests();
    }

    private void leaderElectorRunTests(int numServers) throws Throwable {
        createCopycatServers(numServers);
        Atomix client1 = createAtomixClient();
        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
        elector1.run("foo", node1).thenAccept(result -> {
            assertEquals(node1, result.leaderNodeId());
            assertEquals(1, result.leader().term());
            assertEquals(1, result.candidates().size());
            assertEquals(node1, result.candidates().get(0));
        }).join();
        Atomix client2 = createAtomixClient();
        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
        elector2.run("foo", node2).thenAccept(result -> {
            assertEquals(node1, result.leaderNodeId());
            assertEquals(1, result.leader().term());
            assertEquals(2, result.candidates().size());
            assertEquals(node1, result.candidates().get(0));
            assertEquals(node2, result.candidates().get(1));
        }).join();
    }

    @Test
    public void testWithdraw() throws Throwable {
        leaderElectorWithdrawTests(1);
        clearTests();
        leaderElectorWithdrawTests(2);
        clearTests();
        leaderElectorWithdrawTests(3);
        clearTests();
    }

    private void leaderElectorWithdrawTests(int numServers) throws Throwable {
        createCopycatServers(numServers);
        Atomix client1 = createAtomixClient();
        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
        elector1.run("foo", node1).join();
        Atomix client2 = createAtomixClient();
        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
        elector2.run("foo", node2).join();

        LeaderEventListener listener1 = new LeaderEventListener();
        elector1.addChangeListener(listener1).join();

        LeaderEventListener listener2 = new LeaderEventListener();
        elector2.addChangeListener(listener2).join();

        elector1.withdraw("foo").join();

        listener1.nextEvent().thenAccept(result -> {
            assertEquals(node2, result.newValue().leaderNodeId());
            assertEquals(2, result.newValue().leader().term());
            assertEquals(1, result.newValue().candidates().size());
            assertEquals(node2, result.newValue().candidates().get(0));
        }).join();

        listener2.nextEvent().thenAccept(result -> {
            assertEquals(node2, result.newValue().leaderNodeId());
            assertEquals(2, result.newValue().leader().term());
            assertEquals(1, result.newValue().candidates().size());
            assertEquals(node2, result.newValue().candidates().get(0));
        }).join();
    }

    @Test
    public void testAnoint() throws Throwable {
        leaderElectorAnointTests(1);
        clearTests();
        leaderElectorAnointTests(2);
        clearTests();
        leaderElectorAnointTests(3);
        clearTests();
    }

    private void leaderElectorAnointTests(int numServers) throws Throwable {
        createCopycatServers(numServers);
        Atomix client1 = createAtomixClient();
        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
        Atomix client2 = createAtomixClient();
        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
        Atomix client3 = createAtomixClient();
        AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
        elector1.run("foo", node1).join();
        elector2.run("foo", node2).join();

        LeaderEventListener listener1 = new LeaderEventListener();
        elector1.addChangeListener(listener1).join();
        LeaderEventListener listener2 = new LeaderEventListener();
        elector2.addChangeListener(listener2);
        LeaderEventListener listener3 = new LeaderEventListener();
        elector3.addChangeListener(listener3).join();

        elector3.anoint("foo", node3).thenAccept(result -> {
            assertFalse(result);
        }).join();
        assertFalse(listener1.hasEvent());
        assertFalse(listener2.hasEvent());
        assertFalse(listener3.hasEvent());

        elector3.anoint("foo", node2).thenAccept(result -> {
            assertTrue(result);
        }).join();
        assertTrue(listener1.hasEvent());
        assertTrue(listener2.hasEvent());
        assertTrue(listener3.hasEvent());

        listener1.nextEvent().thenAccept(result -> {
            assertEquals(node2, result.newValue().leaderNodeId());
            assertEquals(2, result.newValue().candidates().size());
            assertEquals(node1, result.newValue().candidates().get(0));
            assertEquals(node2, result.newValue().candidates().get(1));
        }).join();
        listener2.nextEvent().thenAccept(result -> {
            assertEquals(node2, result.newValue().leaderNodeId());
            assertEquals(2, result.newValue().candidates().size());
            assertEquals(node1, result.newValue().candidates().get(0));
            assertEquals(node2, result.newValue().candidates().get(1));
        }).join();
        listener3.nextEvent().thenAccept(result -> {
            assertEquals(node2, result.newValue().leaderNodeId());
            assertEquals(2, result.newValue().candidates().size());
            assertEquals(node1, result.newValue().candidates().get(0));
            assertEquals(node2, result.newValue().candidates().get(1));
        }).join();
    }

    @Test
    public void testPromote() throws Throwable {
        leaderElectorPromoteTests(1);
        clearTests();
        leaderElectorPromoteTests(2);
        clearTests();
        leaderElectorPromoteTests(3);
        clearTests();
    }

    private void leaderElectorPromoteTests(int numServers) throws Throwable {
        createCopycatServers(numServers);
        Atomix client1 = createAtomixClient();
        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
        Atomix client2 = createAtomixClient();
        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
        Atomix client3 = createAtomixClient();
        AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
        elector1.run("foo", node1).join();
        elector2.run("foo", node2).join();

        LeaderEventListener listener1 = new LeaderEventListener();
        elector1.addChangeListener(listener1).join();
        LeaderEventListener listener2 = new LeaderEventListener();
        elector2.addChangeListener(listener2).join();
        LeaderEventListener listener3 = new LeaderEventListener();
        elector3.addChangeListener(listener3).join();

        elector3.promote("foo", node3).thenAccept(result -> {
            assertFalse(result);
        }).join();

        assertFalse(listener1.hasEvent());
        assertFalse(listener2.hasEvent());
        assertFalse(listener3.hasEvent());

        elector3.run("foo", node3).join();

        listener1.clearEvents();
        listener2.clearEvents();
        listener3.clearEvents();

        elector3.promote("foo", node3).thenAccept(result -> {
            assertTrue(result);
        }).join();

        listener1.nextEvent().thenAccept(result -> {
            assertEquals(node3, result.newValue().candidates().get(0));
        }).join();
        listener2.nextEvent().thenAccept(result -> {
            assertEquals(node3, result.newValue().candidates().get(0));
        }).join();
        listener3.nextEvent().thenAccept(result -> {
            assertEquals(node3, result.newValue().candidates().get(0));
        }).join();
    }

    @Test
    public void testLeaderSessionClose() throws Throwable {
        leaderElectorLeaderSessionCloseTests(1);
        clearTests();
        leaderElectorLeaderSessionCloseTests(2);
        clearTests();
        leaderElectorLeaderSessionCloseTests(3);
        clearTests();
    }

    private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
        createCopycatServers(numServers);
        Atomix client1 = createAtomixClient();
        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
        elector1.run("foo", node1).join();
        Atomix client2 = createAtomixClient();
        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
        LeaderEventListener listener = new LeaderEventListener();
        elector2.run("foo", node2).join();
        elector2.addChangeListener(listener).join();
        client1.close();
        listener.nextEvent().thenAccept(result -> {
            assertEquals(node2, result.newValue().leaderNodeId());
            assertEquals(1, result.newValue().candidates().size());
            assertEquals(node2, result.newValue().candidates().get(0));
        }).join();
    }

    @Test
    public void testNonLeaderSessionClose() throws Throwable {
        leaderElectorNonLeaderSessionCloseTests(1);
        clearTests();
        leaderElectorNonLeaderSessionCloseTests(2);
        clearTests();
        leaderElectorNonLeaderSessionCloseTests(3);
        clearTests();
    }

    private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
        createCopycatServers(numServers);
        Atomix client1 = createAtomixClient();
        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
        elector1.run("foo", node1).join();
        Atomix client2 = createAtomixClient();
        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
        LeaderEventListener listener = new LeaderEventListener();
        elector2.run("foo", node2).join();
        elector1.addChangeListener(listener).join();
        client2.close().join();
        listener.nextEvent().thenAccept(result -> {
            assertEquals(node1, result.newValue().leaderNodeId());
            assertEquals(1, result.newValue().candidates().size());
            assertEquals(node1, result.newValue().candidates().get(0));
        }).join();
    }

    @Test
    public void testQueries() throws Throwable {
        leaderElectorQueryTests(1);
        clearTests();
        leaderElectorQueryTests(2);
        clearTests();
        leaderElectorQueryTests(3);
        clearTests();
    }

    private void leaderElectorQueryTests(int numServers) throws Throwable {
        createCopycatServers(numServers);
        Atomix client1 = createAtomixClient();
        Atomix client2 = createAtomixClient();
        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
        elector1.run("foo", node1).join();
        elector2.run("foo", node2).join();
        elector2.run("bar", node2).join();
        elector1.getElectedTopics(node1).thenAccept(result -> {
            assertEquals(1, result.size());
            assertTrue(result.contains("foo"));
        }).join();
        elector2.getElectedTopics(node1).thenAccept(result -> {
            assertEquals(1, result.size());
            assertTrue(result.contains("foo"));
        }).join();
        elector1.getLeadership("foo").thenAccept(result -> {
            assertEquals(node1, result.leaderNodeId());
            assertEquals(node1, result.candidates().get(0));
            assertEquals(node2, result.candidates().get(1));
        }).join();
        elector2.getLeadership("foo").thenAccept(result -> {
            assertEquals(node1, result.leaderNodeId());
            assertEquals(node1, result.candidates().get(0));
            assertEquals(node2, result.candidates().get(1));
        }).join();
        elector1.getLeadership("bar").thenAccept(result -> {
            assertEquals(node2, result.leaderNodeId());
            assertEquals(node2, result.candidates().get(0));
        }).join();
        elector2.getLeadership("bar").thenAccept(result -> {
            assertEquals(node2, result.leaderNodeId());
            assertEquals(node2, result.candidates().get(0));
        }).join();
        elector1.getLeaderships().thenAccept(result -> {
            assertEquals(2, result.size());
            Leadership fooLeadership = result.get("foo");
            assertEquals(node1, fooLeadership.leaderNodeId());
            assertEquals(node1, fooLeadership.candidates().get(0));
            assertEquals(node2, fooLeadership.candidates().get(1));
            Leadership barLeadership = result.get("bar");
            assertEquals(node2, barLeadership.leaderNodeId());
            assertEquals(node2, barLeadership.candidates().get(0));
        }).join();
        elector2.getLeaderships().thenAccept(result -> {
            assertEquals(2, result.size());
            Leadership fooLeadership = result.get("foo");
            assertEquals(node1, fooLeadership.leaderNodeId());
            assertEquals(node1, fooLeadership.candidates().get(0));
            assertEquals(node2, fooLeadership.candidates().get(1));
            Leadership barLeadership = result.get("bar");
            assertEquals(node2, barLeadership.leaderNodeId());
            assertEquals(node2, barLeadership.candidates().get(0));
        }).join();
    }

    private static class LeaderEventListener implements Consumer<Change<Leadership>> {
        Queue<Change<Leadership>> eventQueue = new LinkedList<>();
        CompletableFuture<Change<Leadership>> pendingFuture;

        @Override
        public void accept(Change<Leadership> change) {
            synchronized (this) {
                if (pendingFuture != null) {
                    pendingFuture.complete(change);
                    pendingFuture = null;
                } else {
                    eventQueue.add(change);
                }
            }
        }

        public boolean hasEvent() {
            return !eventQueue.isEmpty();
        }

        public void clearEvents() {
            eventQueue.clear();
        }

        public CompletableFuture<Change<Leadership>> nextEvent() {
            synchronized (this) {
                if (eventQueue.isEmpty()) {
                    if (pendingFuture == null) {
                        pendingFuture = new CompletableFuture<>();
                    }
                    return pendingFuture;
                } else {
                    return CompletableFuture.completedFuture(eventQueue.poll());
                }
            }
        }
    }
}
