/*
 * Copyright 2016-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.cluster.impl;

import static org.slf4j.LoggerFactory.getLogger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

/**
 * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
 */
@Service
@Component(immediate = true, enabled = false)
public class DistributedLeadershipStore
    extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
    implements LeadershipStore {

    private static final Logger log = getLogger(DistributedLeadershipStore.class);

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;

    protected NodeId localNodeId;
    protected ConsistentMap<String, InternalLeadership> leadershipMap;
    protected Map<String, Versioned<InternalLeadership>> leadershipCache = Maps.newConcurrentMap();

    private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
            event -> {
                Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
                Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
                boolean leaderChanged =
                        !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
                boolean candidatesChanged =
                        !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
                                                    ImmutableSet.<NodeId>of() : oldValue.candidates()),
                                                  Sets.newHashSet(newValue.candidates())).isEmpty();
                LeadershipEvent.Type eventType = null;
                if (leaderChanged && candidatesChanged) {
                    eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
                }
                if (leaderChanged && !candidatesChanged) {
                    eventType = LeadershipEvent.Type.LEADER_CHANGED;
                }
                if (!leaderChanged && candidatesChanged) {
                    eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
                }
                leadershipCache.compute(event.key(), (k, v) -> {
                    if (v == null || v.version() < event.newValue().version()) {
                        return event.newValue();
                    }
                    return v;
                });
                notifyDelegate(new LeadershipEvent(eventType, newValue));
            };

    @Activate
    public void activate() {
        localNodeId = clusterService.getLocalNode().id();
        leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
                                      .withName("onos-leadership")
                                      .withPartitionsDisabled()
                                      .withRelaxedReadConsistency()
                                      .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
                                      .build();
        leadershipMap.entrySet().forEach(e -> leadershipCache.put(e.getKey(), e.getValue()));
        leadershipMap.addListener(leadershipChangeListener);
        log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        leadershipMap.removeListener(leadershipChangeListener);
        log.info("Stopped");
    }

    @Override
    public Leadership addRegistration(String topic) {
        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
                v -> v == null || !v.candidates().contains(localNodeId),
                (k, v) -> {
                    if (v == null || v.candidates().isEmpty()) {
                        return new InternalLeadership(topic,
                                localNodeId,
                                v == null ? 1 : v.term() + 1,
                                System.currentTimeMillis(),
                                ImmutableList.of(localNodeId));
                    }
                    List<NodeId> newCandidates = new ArrayList<>(v.candidates());
                    newCandidates.add(localNodeId);
                    return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
                });
        return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
    }

    @Override
    public void removeRegistration(String topic) {
        removeRegistration(topic, localNodeId);
    }

    private void removeRegistration(String topic, NodeId nodeId) {
        leadershipMap.computeIf(topic,
                v -> v != null && v.candidates().contains(nodeId),
                (k, v) -> {
                    List<NodeId> newCandidates = v.candidates()
                            .stream()
                            .filter(id -> !nodeId.equals(id))
                            .collect(Collectors.toList());
                    NodeId newLeader = nodeId.equals(v.leader()) ?
                            newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
                    long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
                            v.term() : v.term() + 1;
                    long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
                            v.termStartTime() : System.currentTimeMillis();
                    return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
                });
    }

    @Override
    public void removeRegistration(NodeId nodeId) {
        leadershipMap.entrySet()
                                  .stream()
                                  .filter(e -> e.getValue().value().candidates().contains(nodeId))
                                  .map(e -> e.getKey())
                                  .forEach(topic -> this.removeRegistration(topic, nodeId));
    }

    @Override
    public boolean moveLeadership(String topic, NodeId toNodeId) {
        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
                v -> v != null &&
                    v.candidates().contains(toNodeId) &&
                    !Objects.equal(v.leader(), toNodeId),
                (k, v) -> {
                    List<NodeId> newCandidates = new ArrayList<>();
                    newCandidates.add(toNodeId);
                    newCandidates.addAll(v.candidates()
                            .stream()
                            .filter(id -> !toNodeId.equals(id))
                            .collect(Collectors.toList()));
                    return new InternalLeadership(topic,
                            toNodeId,
                            v.term() + 1,
                            System.currentTimeMillis(),
                            newCandidates);
                });
        return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
    }

    @Override
    public boolean makeTopCandidate(String topic, NodeId nodeId) {
        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
                v -> v != null &&
                v.candidates().contains(nodeId) &&
                !v.candidates().get(0).equals(nodeId),
                (k, v) -> {
                    List<NodeId> newCandidates = new ArrayList<>();
                    newCandidates.add(nodeId);
                    newCandidates.addAll(v.candidates()
                            .stream()
                            .filter(id -> !nodeId.equals(id))
                            .collect(Collectors.toList()));
                    return new InternalLeadership(topic,
                            v.leader(),
                            v.term(),
                            System.currentTimeMillis(),
                            newCandidates);
                });
        return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
    }

    @Override
    public Leadership getLeadership(String topic) {
        InternalLeadership internalLeadership = Versioned.valueOrNull(leadershipMap.get(topic));
        return internalLeadership == null ? null : internalLeadership.asLeadership();
    }

    @Override
    public Map<String, Leadership> getLeaderships() {
        return ImmutableMap.copyOf(Maps.transformValues(leadershipCache, v -> v.value().asLeadership()));
    }

    private static class InternalLeadership {
        private final String topic;
        private final NodeId leader;
        private final long term;
        private final long termStartTime;
        private final List<NodeId> candidates;

        public InternalLeadership(String topic,
                NodeId leader,
                long term,
                long termStartTime,
                List<NodeId> candidates) {
            this.topic = topic;
            this.leader = leader;
            this.term = term;
            this.termStartTime = termStartTime;
            this.candidates = ImmutableList.copyOf(candidates);
        }

        public NodeId leader() {
            return this.leader;
        }

        public long term() {
            return term;
        }

        public long termStartTime() {
            return termStartTime;
        }

        public List<NodeId> candidates() {
            return candidates;
        }

        public Leadership asLeadership() {
            return new Leadership(topic, leader == null ?
                    null : new Leader(leader, term, termStartTime), candidates);
        }

        public static Leadership toLeadership(InternalLeadership internalLeadership) {
            return internalLeadership == null ? null : internalLeadership.asLeadership();
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper(getClass())
                    .add("leader", leader)
                    .add("term", term)
                    .add("termStartTime", termStartTime)
                    .add("candidates", candidates)
                    .toString();
        }
    }
}
