/*
 * Copyright 2016-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.store.primitives.resources.impl;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.protocols.raft.proxy.RaftProxy;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.NodeUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
import org.onosproject.store.service.IllegalDocumentModificationException;
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.ILLEGAL_MODIFICATION;
import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.INVALID_PATH;
import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.OK;

/**
 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
 */
public class AtomixDocumentTree extends AbstractRaftPrimitive implements AsyncDocumentTree<byte[]> {
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
            .register(KryoNamespaces.BASIC)
            .register(AtomixDocumentTreeOperations.NAMESPACE)
            .register(AtomixDocumentTreeEvents.NAMESPACE)
            .build());

    private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new ConcurrentHashMap<>();

    public AtomixDocumentTree(RaftProxy proxy) {
        super(proxy);
        proxy.addStateChangeListener(state -> {
            if (state == RaftProxy.State.CONNECTED && isListening()) {
                proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen());
            }
        });
        proxy.addEventListener(CHANGE, SERIALIZER::decode, this::processTreeUpdates);
    }

    @Override
    public Type primitiveType() {
        return Type.DOCUMENT_TREE;
    }

    @Override
    public CompletableFuture<Void> destroy() {
        return proxy.invoke(CLEAR);
    }

    @Override
    public DocumentPath root() {
        return DocumentPath.ROOT;
    }

    @Override
    public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
        return proxy.<GetChildren, DocumentTreeResult<Map<String, Versioned<byte[]>>>>invoke(
                GET_CHILDREN,
                SERIALIZER::encode,
                new GetChildren(checkNotNull(path)),
                SERIALIZER::decode)
                .thenCompose(result -> {
                    if (result.status() == INVALID_PATH) {
                        return Tools.exceptionalFuture(new NoSuchDocumentPathException());
                    } else if (result.status() == ILLEGAL_MODIFICATION) {
                        return Tools.exceptionalFuture(new IllegalDocumentModificationException());
                    } else {
                        return CompletableFuture.completedFuture(result);
                    }
                }).thenApply(result -> result.result());
    }

    @Override
    public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
        return proxy.invoke(GET, SERIALIZER::encode, new Get(checkNotNull(path)), SERIALIZER::decode);
    }

    @Override
    public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
        return proxy.<Update, DocumentTreeResult<Versioned<byte[]>>>invoke(UPDATE,
                SERIALIZER::encode,
                new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()),
                SERIALIZER::decode)
                .thenCompose(result -> {
                    if (result.status() == INVALID_PATH) {
                        return Tools.exceptionalFuture(new NoSuchDocumentPathException());
                    } else if (result.status() == ILLEGAL_MODIFICATION) {
                        return Tools.exceptionalFuture(new IllegalDocumentModificationException());
                    } else {
                        return CompletableFuture.completedFuture(result);
                    }
                }).thenApply(result -> result.result());
    }

    @Override
    public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
        return createInternal(path, value)
                .thenCompose(status -> {
                    if (status == ILLEGAL_MODIFICATION) {
                        return Tools.exceptionalFuture(new IllegalDocumentModificationException());
                    }
                    return CompletableFuture.completedFuture(true);
                });
    }

    @Override
    public CompletableFuture<Boolean> createRecursive(DocumentPath path, byte[] value) {
        return createInternal(path, value)
                .thenCompose(status -> {
                    if (status == ILLEGAL_MODIFICATION) {
                        return createRecursive(path.parent(), null)
                                .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
                    }
                    return CompletableFuture.completedFuture(status == OK);
                });
    }

    @Override
    public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
        return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
                SERIALIZER::encode,
                new Update(checkNotNull(path),
                        Optional.ofNullable(newValue),
                        Match.any(),
                        Match.ifValue(version)), SERIALIZER::decode)
                .thenApply(result -> result.updated());
    }

    @Override
    public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
        return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
                SERIALIZER::encode,
                new Update(checkNotNull(path),
                        Optional.ofNullable(newValue),
                        Match.ifValue(currentValue),
                        Match.any()),
                SERIALIZER::decode)
                .thenCompose(result -> {
                    if (result.status() == INVALID_PATH) {
                        return Tools.exceptionalFuture(new NoSuchDocumentPathException());
                    } else if (result.status() == ILLEGAL_MODIFICATION) {
                        return Tools.exceptionalFuture(new IllegalDocumentModificationException());
                    } else {
                        return CompletableFuture.completedFuture(result);
                    }
                }).thenApply(result -> result.updated());
    }

    @Override
    public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
        if (path.equals(DocumentPath.from("root"))) {
            return Tools.exceptionalFuture(new IllegalDocumentModificationException());
        }
        return proxy.<Update, DocumentTreeResult<Versioned<byte[]>>>invoke(UPDATE,
                SERIALIZER::encode,
                new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()),
                SERIALIZER::decode)
                .thenCompose(result -> {
                    if (result.status() == INVALID_PATH) {
                        return Tools.exceptionalFuture(new NoSuchDocumentPathException());
                    } else if (result.status() == ILLEGAL_MODIFICATION) {
                        return Tools.exceptionalFuture(new IllegalDocumentModificationException());
                    } else {
                        return CompletableFuture.completedFuture(result);
                    }
                }).thenApply(result -> result.result());
    }

    @Override
    public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
        checkNotNull(path);
        checkNotNull(listener);
        InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
        // TODO: Support API that takes an executor
        if (!eventListeners.containsKey(listener)) {
            return proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen(path))
                    .thenRun(() -> eventListeners.put(listener, internalListener));
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
        checkNotNull(listener);
        InternalListener internalListener = eventListeners.remove(listener);
        if  (internalListener != null && eventListeners.isEmpty()) {
            return proxy.invoke(REMOVE_LISTENER, SERIALIZER::encode, new Unlisten(internalListener.path))
                    .thenApply(v -> null);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<DocumentTreeResult.Status> createInternal(DocumentPath path, byte[] value) {
        return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
                SERIALIZER::encode,
                new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()),
                SERIALIZER::decode)
                .thenApply(result -> result.status());
    }

    private boolean isListening() {
        return !eventListeners.isEmpty();
    }

    private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
        events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
    }

    @Override
    public CompletableFuture<Version> begin(TransactionId transactionId) {
        return proxy.<TransactionBegin, Long>invoke(
                BEGIN,
                SERIALIZER::encode,
                new TransactionBegin(transactionId),
                SERIALIZER::decode)
                .thenApply(Version::new);
    }

    @Override
    public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
        return proxy.<TransactionPrepare, PrepareResult>invoke(
                PREPARE,
                SERIALIZER::encode,
                new TransactionPrepare(transactionLog),
                SERIALIZER::decode)
                .thenApply(v -> v == PrepareResult.OK);
    }

    @Override
    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
        return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
                PREPARE_AND_COMMIT,
                SERIALIZER::encode,
                new TransactionPrepareAndCommit(transactionLog),
                SERIALIZER::decode)
                .thenApply(v -> v == PrepareResult.OK);
    }

    @Override
    public CompletableFuture<Void> commit(TransactionId transactionId) {
        return proxy.<TransactionCommit, CommitResult>invoke(
                COMMIT,
                SERIALIZER::encode,
                new TransactionCommit(transactionId),
                SERIALIZER::decode)
                .thenApply(v -> null);
    }

    @Override
    public CompletableFuture<Void> rollback(TransactionId transactionId) {
        return proxy.invoke(
                ROLLBACK,
                SERIALIZER::encode,
                new TransactionRollback(transactionId),
                SERIALIZER::decode)
                .thenApply(v -> null);
    }

    private class InternalListener implements DocumentTreeListener<byte[]> {

        private final DocumentPath path;
        private final DocumentTreeListener<byte[]> listener;
        private final Executor executor;

        public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
            this.path = path;
            this.listener = listener;
            this.executor = executor;
        }

        @Override
        public void event(DocumentTreeEvent<byte[]> event) {
            if (event.path().isDescendentOf(path)) {
                executor.execute(() -> listener.event(event));
            }
        }
    }
}