/*
 * Copyright 2016-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.primitives.resources.impl;

import com.google.common.util.concurrent.Uninterruptibles;
import io.atomix.AtomixClient;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.local.LocalServerRegistry;
import io.atomix.catalyst.transport.netty.NettyTransport;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.internal.ResourceManagerState;
import io.atomix.resource.ResourceType;
import org.onlab.junit.TestTools;
import org.onosproject.store.primitives.impl.CatalystSerializers;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Base class for various Atomix tests.
 */
public abstract class AtomixTestBase {
    protected static File testDir;
    protected static LocalServerRegistry registry = new LocalServerRegistry();
    protected static List<Address> members = new ArrayList<>();
    protected static List<CopycatClient> copycatClients = new ArrayList<>();
    protected static List<CopycatServer> copycatServers = new ArrayList<>();
    protected static List<AtomixClient> atomixClients = new ArrayList<>();
    protected static List<CopycatServer> atomixServers = new ArrayList<>();
    protected static Serializer serializer = CatalystSerializers.getSerializer();
    protected static AtomicInteger port = new AtomicInteger(49200);

    /**
     * Creates a new resource state machine.
     *
     * @return A new resource state machine.
     */
    protected abstract ResourceType resourceType();

    /**
     * Returns the next server address.
     *
     * @return The next server address.
     */
    private static Address nextAddress() {
        Address address = new Address("127.0.0.1",
                          TestTools.findAvailablePort(port.getAndIncrement()));
        members.add(address);
        return address;
    }

    /**
     * Creates a set of Copycat servers.
     */
    protected static List<CopycatServer> createCopycatServers(int nodes)
            throws Throwable {
        CountDownLatch latch = new CountDownLatch(nodes);
        List<CopycatServer> servers = new ArrayList<>();

        List<Address> members = new ArrayList<>();

        for (int i = 0; i < nodes; i++) {
            Address address = nextAddress();
            members.add(address);
            CopycatServer server = createCopycatServer(address);
            if (members.size() <= 1) {
                server.bootstrap().thenRun(latch::countDown).join();
            } else {
                server.join(members).join();
            }
            servers.add(server);
        }

        return servers;
    }

    /**
     * Creates a Copycat server.
     */
    protected static CopycatServer createCopycatServer(Address address) {
        CopycatServer server = CopycatServer.builder(address)
                .withTransport(NettyTransport.builder().withThreads(1).build())
                .withStorage(Storage.builder()
                             .withStorageLevel(StorageLevel.MEMORY)
                             .build())
                .withStateMachine(ResourceManagerState::new)
                .withSerializer(serializer.clone())
                .withHeartbeatInterval(Duration.ofMillis(25))
                .withElectionTimeout(Duration.ofMillis(50))
                .withSessionTimeout(Duration.ofMillis(100))
                .build();
        copycatServers.add(server);
        return server;
    }

    public static void clearTests() throws Exception {
        registry = new LocalServerRegistry();
        members = new ArrayList<>();

        CompletableFuture<Void> closeClients =
                CompletableFuture.allOf(atomixClients.stream()
                                         .map(AtomixClient::close)
                                         .toArray(CompletableFuture[]::new));

        closeClients
                .thenCompose(v -> CompletableFuture
                        .allOf(copycatServers.stream()
                .map(CopycatServer::shutdown)
                .toArray(CompletableFuture[]::new))).join();

        atomixClients = new ArrayList<>();

        copycatServers = new ArrayList<>();
    }

    /**
     * Deletes a directory recursively.
     */
    private void deleteDirectory(File directory) throws IOException {
        if (directory.exists()) {
            File[] files = directory.listFiles();
            if (files != null) {
                for (File file : files) {
                    if (file.isDirectory()) {
                        deleteDirectory(file);
                    } else {
                        Files.delete(file.toPath());
                    }
                }
            }
            Files.delete(directory.toPath());
        }
    }

    /**
     * Creates a Atomix client.
     */
    protected AtomixClient createAtomixClient() {
        CountDownLatch latch = new CountDownLatch(1);
        AtomixClient client = AtomixClient.builder()
                .withTransport(NettyTransport.builder().withThreads(1).build())
                .withSerializer(serializer.clone())
                .build();
        client.connect(members).thenRun(latch::countDown);
        atomixClients.add(client);
        Uninterruptibles.awaitUninterruptibly(latch);
        return client;
    }
}
