/*
 * Copyright 2016 Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.primitives.impl;

import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;

import java.io.File;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.service.PartitionInfo;

import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;

/**
 * Storage partition.
 */
public class StoragePartition implements Managed<StoragePartition> {

    private final AtomicBoolean isOpened = new AtomicBoolean(false);
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final Serializer serializer;
    private final MessagingService messagingService;
    private final ClusterService clusterService;
    private final File logFolder;
    private Partition partition;
    private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
                                                        new ResourceType(DistributedLong.class),
                                                        new ResourceType(AtomixLeaderElector.class),
                                                        new ResourceType(AtomixConsistentMap.class));

    private NodeId localNodeId;
    private StoragePartitionServer server;
    private StoragePartitionClient client;

    public StoragePartition(Partition partition,
            MessagingService messagingService,
            ClusterService clusterService,
            Serializer serializer,
            File logFolder) {
        this.partition = partition;
        this.messagingService = messagingService;
        this.clusterService = clusterService;
        this.localNodeId = clusterService.getLocalNode().id();
        this.serializer = serializer;
        this.logFolder = logFolder;
    }

    /**
     * Returns the partition client instance.
     * @return client
     */
    public StoragePartitionClient client() {
        return client;
    }

    @Override
    public CompletableFuture<Void> open() {
        if (partition.getMembers().contains(localNodeId)) {
            openServer();
        }
        return openClient().thenAccept(v -> isOpened.set(true))
                           .thenApply(v -> null);
    }

    @Override
    public CompletableFuture<Void> close() {
        // We do not explicitly close the server and instead let the cluster
        // deal with this as an unclean exit.
        return closeClient().thenAccept(v -> isClosed.set(true))
                            .thenApply(v -> null);
    }

    /**
     * Returns the identifier of the {@link Partition partition} associated with this instance.
     * @return partition identifier
     */
    public PartitionId getId() {
        return partition.getId();
    }

    /**
     * Returns the identifiers of partition members.
     * @return partition member instance ids
     */
    public Collection<NodeId> getMembers() {
        return partition.getMembers();
    }

    /**
     * Returns the {@link Address addresses} of partition members.
     * @return partition member addresses
     */
    public Collection<Address> getMemberAddresses() {
        return Collections2.transform(partition.getMembers(), this::toAddress);
    }

    /**
     * Attempts to rejoin the partition.
     * @return future that is completed after the operation is complete
     */
    private CompletableFuture<Void> openServer() {
        if (!partition.getMembers().contains(localNodeId) || server != null) {
            return CompletableFuture.completedFuture(null);
        }
        StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
                this,
                serializer,
                () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
                                     partition.getId(),
                                     messagingService),
                RESOURCE_TYPES,
                logFolder);
        return server.open().thenRun(() -> this.server = server);
    }

    /**
     * Attempts to join the partition as a new member.
     * @return future that is completed after the operation is complete
     */
    private CompletableFuture<Void> joinCluster() {
        Set<NodeId> otherMembers = partition.getMembers()
                 .stream()
                 .filter(nodeId -> !nodeId.equals(localNodeId))
                 .collect(Collectors.toSet());
        StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
                this,
                serializer,
                () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
                                     partition.getId(),
                                     messagingService),
                RESOURCE_TYPES,
                logFolder);
        return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
    }

    private CompletableFuture<StoragePartitionClient> openClient() {
        client = new StoragePartitionClient(this,
                serializer,
                new CopycatTransport(CopycatTransport.Mode.CLIENT,
                                     partition.getId(),
                                     messagingService),
                RESOURCE_TYPES);
        return client.open().thenApply(v -> client);
    }

    /**
     * Closes the partition server if it was previously opened.
     * @return future that is completed when the operation completes
     */
    public CompletableFuture<Void> leaveCluster() {
        return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
    }

    @Override
    public boolean isOpen() {
        return isOpened.get() && !isClosed.get();
    }

    @Override
    public boolean isClosed() {
        return isClosed.get();
    }

    private CompletableFuture<Void> closeClient() {
        if (client != null) {
            return client.close();
        }
        return CompletableFuture.completedFuture(null);
    }

    private Address toAddress(NodeId nodeId) {
        ControllerNode node = clusterService.getNode(nodeId);
        return new Address(node.ip().toString(), node.tcpPort());
    }

    /**
     * Returns the partition information if this partition is locally managed i.e.
     * this node is a active member of the partition.
     * @return partition info
     */
    public Optional<PartitionInfo> info() {
        return server != null && !server.isClosed() ? Optional.of(server.info()) : Optional.empty();
    }

    public void onUpdate(Partition newValue) {
        if (partition.getMembers().contains(localNodeId) && newValue.getMembers().contains(localNodeId)) {
            return;
        }
        if (!partition.getMembers().contains(localNodeId) && !newValue.getMembers().contains(localNodeId)) {
            return;
        }
        this.partition = newValue;
        if (partition.getMembers().contains(localNodeId)) {
            joinCluster();
        } else if (!partition.getMembers().contains(localNodeId)) {
            leaveCluster();
        }
    }
}
