/*
 * Copyright 2015-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onlab.util;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
 * Base implementation of an item accumulator. It allows triggering based on
 * item inter-arrival time threshold, maximum batch life threshold and maximum
 * batch size.
 */
public abstract class AbstractAccumulator<T> implements Accumulator<T> {

    private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);

    private final Timer timer;
    private final int maxItems;
    private final int maxBatchMillis;
    private final int maxIdleMillis;

    private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
    private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();

    private final List<T> items;

    /**
     * Creates an item accumulator capable of triggering on the specified
     * thresholds.
     *
     * @param timer          timer to use for scheduling check-points
     * @param maxItems       maximum number of items to accumulate before
     *                       processing is triggered
     *                       <p>
     *                       NB: It is possible that processItems will contain
     *                       more than maxItems under high load or if isReady()
     *                       can return false.
     *                       </p>
     * @param maxBatchMillis maximum number of millis allowed since the first
     *                       item before processing is triggered
     * @param maxIdleMillis  maximum number millis between items before
     *                       processing is triggered
     */
    protected AbstractAccumulator(Timer timer, int maxItems,
                                  int maxBatchMillis, int maxIdleMillis) {
        this.timer = checkNotNull(timer, "Timer cannot be null");

        checkArgument(maxItems > 1, "Maximum number of items must be > 1");
        checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
        checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");

        this.maxItems = maxItems;
        this.maxBatchMillis = maxBatchMillis;
        this.maxIdleMillis = maxIdleMillis;

        items = Lists.newArrayListWithExpectedSize(maxItems);
    }

    @Override
    public void add(T item) {
        final int sizeAtTimeOfAdd;
        synchronized (items) {
            items.add(item);
            sizeAtTimeOfAdd = items.size();
        }

        /*
            WARNING: It is possible that the item that was just added to the list
            has been processed by an existing idle task at this point.

            By rescheduling the following timers, it is possible that a
            superfluous maxTask is generated now OR that the idle task and max
            task are scheduled at their specified delays. This could result in
            calls to processItems sooner than expected.
         */

        // Did we hit the max item threshold?
        if (sizeAtTimeOfAdd >= maxItems) {
            if (maxIdleMillis < maxBatchMillis) {
                cancelTask(idleTask);
            }
            rescheduleTask(maxTask, 0 /* now! */);
        } else {
            // Otherwise, schedule idle task and if this is a first item
            // also schedule the max batch age task.
            if (maxIdleMillis < maxBatchMillis) {
                rescheduleTask(idleTask, maxIdleMillis);
            }
            if (sizeAtTimeOfAdd == 1) {
                rescheduleTask(maxTask, maxBatchMillis);
            }
        }
    }

    /**
     * Reschedules the specified task, cancelling existing one if applicable.
     *
     * @param taskRef task reference
     * @param millis delay in milliseconds
     */
    private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
        ProcessorTask newTask = new ProcessorTask();
        timer.schedule(newTask, millis);
        swapAndCancelTask(taskRef, newTask);
    }

    /**
     * Cancels the specified task if it has not run or is not running.
     *
     * @param taskRef task reference
     */
    private void cancelTask(AtomicReference<TimerTask> taskRef) {
        swapAndCancelTask(taskRef, null);
    }

    /**
     * Sets the new task and attempts to cancelTask the old one.
     *
     * @param taskRef task reference
     * @param newTask new task
     */
    private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
                                        TimerTask newTask) {
        TimerTask oldTask = taskRef.getAndSet(newTask);
        if (oldTask != null) {
            oldTask.cancel();
        }
    }

    // Task for triggering processing of accumulated items
    private class ProcessorTask extends TimerTask {
        @Override
        public void run() {
            try {
                if (isReady()) {

                    List<T> batch = finalizeCurrentBatch();
                    if (!batch.isEmpty()) {
                        processItems(batch);
                    }
                } else {
                    rescheduleTask(idleTask, maxIdleMillis);
                }
            } catch (Exception e) {
                log.warn("Unable to process batch due to", e);
            }
        }
    }

    /**
     * Returns an immutable copy of the existing items and clear the list.
     *
     * @return list of existing items
     */
    private List<T> finalizeCurrentBatch() {
        List<T> finalizedList;
        synchronized (items) {
            finalizedList = ImmutableList.copyOf(items);
            items.clear();
            /*
             * To avoid reprocessing being triggered on an empty list.
             */
            cancelTask(maxTask);
            cancelTask(idleTask);
        }
        return finalizedList;
    }

    @Override
    public boolean isReady() {
        return true;
    }

    /**
     * Returns the backing timer.
     *
     * @return backing timer
     */
    public Timer timer() {
        return timer;
    }

    /**
     * Returns the maximum number of items allowed to accumulate before
     * processing is triggered.
     *
     * @return max number of items
     */
    public int maxItems() {
        return maxItems;
    }

    /**
     * Returns the maximum number of millis allowed to expire since the first
     * item before processing is triggered.
     *
     * @return max number of millis a batch is allowed to last
     */
    public int maxBatchMillis() {
        return maxBatchMillis;
    }

    /**
     * Returns the maximum number of millis allowed to expire since the last
     * item arrival before processing is triggered.
     *
     * @return max number of millis since the last item
     */
    public int maxIdleMillis() {
        return maxIdleMillis;
    }

}
