package org.onlab.onos.store.service.impl;

import static com.google.common.base.Verify.verifyNotNull;
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static java.util.concurrent.Executors.newCachedThreadPool;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;

import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.slf4j.Logger;

/**
 * ONOS Cluster messaging based Copycat protocol client.
 */
public class ClusterMessagingProtocolClient implements ProtocolClient {

    private final Logger log = getLogger(getClass());

    public static final long RETRY_INTERVAL_MILLIS = 2000;

    private final ClusterService clusterService;
    private final ClusterCommunicationService clusterCommunicator;
    private final ControllerNode localNode;
    private final TcpMember remoteMember;

    private ControllerNode remoteNode;
    private final AtomicBoolean connectionOK = new AtomicBoolean(true);

    private ExecutorService pool;

    public ClusterMessagingProtocolClient(
            ClusterService clusterService,
            ClusterCommunicationService clusterCommunicator,
            ControllerNode localNode,
            TcpMember remoteMember) {

        this.clusterService = clusterService;
        this.clusterCommunicator = clusterCommunicator;
        this.localNode = localNode;
        this.remoteMember = remoteMember;
    }

    @Override
    public CompletableFuture<PingResponse> ping(PingRequest request) {
        return requestReply(request);
    }

    @Override
    public CompletableFuture<SyncResponse> sync(SyncRequest request) {
        return requestReply(request);
    }

    @Override
    public CompletableFuture<PollResponse> poll(PollRequest request) {
        return requestReply(request);
    }

    @Override
    public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
        return requestReply(request);
    }

    @Override
    public synchronized CompletableFuture<Void> connect() {
        if (pool == null || pool.isShutdown()) {
            // TODO include remote name?
            pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public synchronized CompletableFuture<Void> close() {
        if (pool != null) {
            pool.shutdownNow();
            pool = null;
        }
        return CompletableFuture.completedFuture(null);
    }

    private <I> MessageSubject messageType(I input) {
        Class<?> clazz = input.getClass();
        if (clazz.equals(PollRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_POLL;
        } else if (clazz.equals(SyncRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_SYNC;
        } else if (clazz.equals(SubmitRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_SUBMIT;
        } else if (clazz.equals(PingRequest.class)) {
            return ClusterMessagingProtocol.COPYCAT_PING;
        } else {
            throw new IllegalArgumentException("Unknown class " + clazz.getName());
        }
    }

    private <I, O> CompletableFuture<O> requestReply(I request) {
        CompletableFuture<O> future = new CompletableFuture<>();
        if (pool == null) {
            log.info("Attempted to use closed client, connecting now. {}", request);
            connect();
        }
        pool.submit(new RPCTask<I, O>(request, future));
        return future;
    }

    private ControllerNode getControllerNode(TcpMember remoteMember) {
        final String host = remoteMember.host();
        final int port = remoteMember.port();
        for (ControllerNode node : clusterService.getNodes()) {
            if (node.ip().toString().equals(host) && node.tcpPort() == port) {
                return node;
            }
        }
        return null;
    }

    private class RPCTask<I, O> implements Runnable {

        private final I request;
        private final ClusterMessage message;
        private final CompletableFuture<O> future;

        public RPCTask(I request, CompletableFuture<O> future) {
            this.request = request;
            this.message =
                    new ClusterMessage(
                            localNode.id(),
                            messageType(request),
                            verifyNotNull(DB_SERIALIZER.encode(request)));
            this.future = future;
        }

        @Override
        public void run() {
            try {
                if (remoteNode == null) {
                    remoteNode = getControllerNode(remoteMember);
                    if (remoteNode == null) {
                        throw new IOException("Remote node is offline!");
                    }
                }
                byte[] response = clusterCommunicator
                    .sendAndReceive(message, remoteNode.id())
                    .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
                if (!connectionOK.getAndSet(true)) {
                    log.info("Connectivity to {} restored", remoteNode);
                }
                future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));

            } catch (IOException | TimeoutException e) {
                if (connectionOK.getAndSet(false)) {
                    log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
                }
                log.debug("RPCTask for {} failed.", request, e);
                future.completeExceptionally(e);
            } catch (ExecutionException e) {
                log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
                log.debug("RPCTask execution for {} failed.", request, e);
                future.completeExceptionally(e);
            } catch (InterruptedException e) {
                log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
                log.debug("RPCTask for {} was interrupted.", request, e);
                future.completeExceptionally(e);
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                log.warn("RPCTask for {} terribly failed.", request, e);
                future.completeExceptionally(e);
            }
        }
    }
}
