/*
 * Copyright 2016-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.store.primitives.resources.impl;

import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;

import com.google.common.collect.ImmutableList;

import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;

/**
 * Distributed resource providing the {@link WorkQueue} primitive.
 */
@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
    implements WorkQueue<byte[]> {

    private final Logger log = getLogger(getClass());
    public static final String TASK_AVAILABLE = "task-available";
    private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log));
    private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
    private final Timer timer = new Timer("atomix-work-queue-completer");
    private final AtomicBoolean isRegistered = new AtomicBoolean(false);

    protected AtomixWorkQueue(CopycatClient client, Properties options) {
        super(client, options);
    }

    @Override
    public CompletableFuture<AtomixWorkQueue> open() {
        return super.open().thenApply(result -> {
            client.onStateChange(state -> {
                if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
                    client.submit(new Register());
                }
            });
            client.onEvent(TASK_AVAILABLE, this::resumeWork);
            return result;
        });
    }

    @Override
    public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
        if (items.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        return client.submit(new Add(items));
    }

    @Override
    public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
        if (maxTasks <= 0) {
            return CompletableFuture.completedFuture(ImmutableList.of());
        }
        return client.submit(new Take(maxTasks));
    }

    @Override
    public CompletableFuture<Void> complete(Collection<String> taskIds) {
        if (taskIds.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        return client.submit(new Complete(taskIds));
    }

    @Override
    public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
                                          int parallelism,
                                          Executor executor) {
        Accumulator<String> completedTaskAccumulator =
                new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
        taskProcessor.set(new TaskProcessor(callback,
                                            parallelism,
                                            executor,
                                            completedTaskAccumulator));
        return register().thenCompose(v -> take(parallelism))
                         .thenAccept(taskProcessor.get()::accept);
    }

    @Override
    public CompletableFuture<Void> stopProcessing() {
        return unregister();
    }

    @Override
    public CompletableFuture<WorkQueueStats> stats() {
        return client.submit(new Stats());
    }

    private void resumeWork() {
        TaskProcessor activeProcessor = taskProcessor.get();
        if (activeProcessor == null) {
            return;
        }
        this.take(activeProcessor.headRoom())
            .whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
    }

    private CompletableFuture<Void> register() {
        return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
    }

    private CompletableFuture<Void> unregister() {
        return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
    }

    // TaskId accumulator for paced triggering of task completion calls.
    private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
        CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
            super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
        }

        @Override
        public void processItems(List<String> items) {
            complete(items);
        }
    }

    private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {

        private final AtomicInteger headRoom;
        private final Consumer<byte[]> backingConsumer;
        private final Executor executor;
        private final Accumulator<String> taskCompleter;

        public TaskProcessor(Consumer<byte[]> backingConsumer,
                             int parallelism,
                             Executor executor,
                             Accumulator<String> taskCompleter) {
            this.backingConsumer = backingConsumer;
            this.headRoom = new AtomicInteger(parallelism);
            this.executor = executor;
            this.taskCompleter = taskCompleter;
        }

        public int headRoom() {
            return headRoom.get();
        }

        @Override
        public void accept(Collection<Task<byte[]>> tasks) {
            if (tasks == null) {
                return;
            }
            headRoom.addAndGet(-1 * tasks.size());
            tasks.forEach(task ->
                executor.execute(() -> {
                    try {
                        backingConsumer.accept(task.payload());
                        taskCompleter.add(task.taskId());
                    } catch (Exception e) {
                        log.debug("Task execution failed", e);
                    } finally {
                        headRoom.incrementAndGet();
                        resumeWork();
                    }
                }));
        }
    }
}
