/*
 * Copyright 2018-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.primitives.resources.impl;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;

import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.utils.concurrent.Scheduled;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;

import static com.google.common.base.MoreObjects.toStringHelper;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockEvents.FAIL;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.LOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Lock;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.UNLOCK;
import static org.onosproject.store.primitives.resources.impl.AtomixDistributedLockOperations.Unlock;

/**
 * Raft atomic value service.
 */
public class AtomixDistributedLockService extends AbstractRaftService {
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
        .register(KryoNamespaces.BASIC)
        .register(AtomixDistributedLockOperations.NAMESPACE)
        .register(AtomixDistributedLockEvents.NAMESPACE)
        .register(LockHolder.class)
        .register(ArrayDeque.class)
        .build());

    private LockHolder lock;
    private Queue<LockHolder> queue = new ArrayDeque<>();
    private final Map<Long, Scheduled> timers = new HashMap<>();

    @Override
    protected void configure(RaftServiceExecutor executor) {
        executor.register(LOCK, SERIALIZER::decode, this::lock);
        executor.register(UNLOCK, SERIALIZER::decode, this::unlock);
    }

    @Override
    public void snapshot(SnapshotWriter writer) {
        writer.writeObject(lock, SERIALIZER::encode);
        writer.writeObject(queue, SERIALIZER::encode);
    }

    @Override
    public void install(SnapshotReader reader) {
        lock = reader.readObject(SERIALIZER::decode);
        queue = reader.readObject(SERIALIZER::decode);
        timers.values().forEach(Scheduled::cancel);
        timers.clear();
        for (LockHolder holder : queue) {
            if (holder.expire > 0) {
                timers.put(holder.index,
                    scheduler().schedule(Duration.ofMillis(holder.expire - wallClock().getTime().unixTimestamp()),
                        () -> {
                            timers.remove(holder.index);
                            queue.remove(holder);
                            RaftSession session = sessions().getSession(holder.session);
                            if (session != null && session.getState().active()) {
                                session.publish(FAIL, SERIALIZER::encode, new LockEvent(holder.id, holder.index));
                            }
                        }));
            }
        }
    }

    @Override
    public void onExpire(RaftSession session) {
        releaseSession(session);
    }

    @Override
    public void onClose(RaftSession session) {
        releaseSession(session);
    }

    /**
     * Applies a lock commit.
     *
     * @param commit the lock commit
     */
    protected void lock(Commit<Lock> commit) {
        if (lock == null) {
            lock = new LockHolder(
                commit.value().id(),
                commit.index(),
                commit.session().sessionId().id(),
                0);
            commit.session().publish(
                AtomixDistributedLockEvents.LOCK,
                SERIALIZER::encode,
                new LockEvent(commit.value().id(), commit.index()));
        } else if (commit.value().timeout() == 0) {
            commit.session().publish(FAIL, SERIALIZER::encode, new LockEvent(commit.value().id(), commit.index()));
        } else if (commit.value().timeout() > 0) {
            LockHolder holder = new LockHolder(
                commit.value().id(),
                commit.index(),
                commit.session().sessionId().id(),
                wallClock().getTime().unixTimestamp() + commit.value().timeout());
            queue.add(holder);
            timers.put(commit.index(), scheduler().schedule(Duration.ofMillis(commit.value().timeout()), () -> {
                timers.remove(commit.index());
                queue.remove(holder);
                if (commit.session().getState().active()) {
                    commit.session().publish(
                        FAIL,
                        SERIALIZER::encode,
                        new LockEvent(commit.value().id(), commit.index()));
                }
            }));
        } else {
            LockHolder holder = new LockHolder(
                commit.value().id(),
                commit.index(),
                commit.session().sessionId().id(),
                0);
            queue.add(holder);
        }
    }

    /**
     * Applies an unlock commit.
     *
     * @param commit the unlock commit
     */
    protected void unlock(Commit<Unlock> commit) {
        if (lock != null) {
            if (lock.session != commit.session().sessionId().id()) {
                return;
            }

            lock = queue.poll();
            while (lock != null) {
                Scheduled timer = timers.remove(lock.index);
                if (timer != null) {
                    timer.cancel();
                }

                RaftSession session = sessions().getSession(lock.session);
                if (session == null || session.getState() == RaftSession.State.EXPIRED
                    || session.getState() == RaftSession.State.CLOSED) {
                    lock = queue.poll();
                } else {
                    session.publish(
                        AtomixDistributedLockEvents.LOCK,
                        SERIALIZER::encode,
                        new LockEvent(lock.id, commit.index()));
                    break;
                }
            }
        }
    }

    private void releaseSession(RaftSession session) {
        if (lock != null && lock.session == session.sessionId().id()) {
            lock = queue.poll();
            while (lock != null) {
                if (lock.session == session.sessionId().id()) {
                    lock = queue.poll();
                } else {
                    Scheduled timer = timers.remove(lock.index);
                    if (timer != null) {
                        timer.cancel();
                    }

                    RaftSession lockSession = sessions().getSession(lock.session);
                    if (lockSession == null || lockSession.getState() == RaftSession.State.EXPIRED
                        || lockSession.getState() == RaftSession.State.CLOSED) {
                        lock = queue.poll();
                    } else {
                        lockSession.publish(
                            AtomixDistributedLockEvents.LOCK,
                            SERIALIZER::encode,
                            new LockEvent(lock.id, lock.index));
                        break;
                    }
                }
            }
        }
    }

    private class LockHolder {
        private final int id;
        private final long index;
        private final long session;
        private final long expire;

        public LockHolder(int id, long index, long session, long expire) {
            this.id = id;
            this.index = index;
            this.session = session;
            this.expire = expire;
        }

        @Override
        public String toString() {
            return toStringHelper(this)
                .add("id", id)
                .add("index", index)
                .add("session", session)
                .add("expire", expire)
                .toString();
        }
    }
}