/*
 * Copyright 2015-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onlab.util;

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.LoggerFactory;

/**
 * Implementation of ThreadPoolExecutor that bounds the work queue.
 * <p>
 * When a new job would exceed the queue bound, the job is run on the caller's
 * thread rather than on a thread from the pool.
 * </p>
 */
public final class BoundedThreadPool extends ThreadPoolExecutor {

    private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);

    static int maxQueueSize = 80_000; //TODO tune this value
    //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
    private static final long STATS_INTERVAL = 5_000; //ms

    private final BlockingBoolean underHighLoad;

    private BoundedThreadPool(int numberOfThreads,
                              ThreadFactory threadFactory) {
        super(numberOfThreads, numberOfThreads,
              0L, TimeUnit.MILLISECONDS,
              new LinkedBlockingQueue<>(maxQueueSize),
              threadFactory,
              new CallerFeedbackPolicy());
        underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
    }

    /**
     * Returns a single-thread, bounded executor service.
     *
     * @param threadFactory thread factory for the worker thread.
     * @return the bounded thread pool
     */
    public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new BoundedThreadPool(1, threadFactory);
    }

    /**
     * Returns a fixed-size, bounded executor service.
     *
     * @param numberOfThreads number of threads in the pool
     * @param threadFactory   thread factory for the worker threads.
     * @return the bounded thread pool
     */
    public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
        return new BoundedThreadPool(numberOfThreads, threadFactory);
    }

    //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
    private final Counter submitted = new Counter();
    private final Counter taken = new Counter();

    @Override
    public Future<?> submit(Runnable task) {
        submitted.add(1);
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        submitted.add(1);
        return super.submit(task, result);
    }

    @Override
    public void execute(Runnable command) {
        submitted.add(1);
        super.execute(command);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        submitted.add(1);
        return super.submit(task);
    }


    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taken.add(1);
        periodicallyPrintStats();
        updateLoad();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            log.error("Uncaught exception on " + r.getClass().getSimpleName(), t);
        }
    }

    // TODO schedule this with a fixed delay from a scheduled executor
    private final AtomicLong lastPrinted = new AtomicLong(0L);

    private void periodicallyPrintStats() {
        long now = System.currentTimeMillis();
        long prev = lastPrinted.get();
        if (now - prev > STATS_INTERVAL) {
            if (lastPrinted.compareAndSet(prev, now)) {
                log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
                         getQueue().size(),
                         submitted.throughput(), taken.throughput());
                submitted.reset();
                taken.reset();
            }
        }
    }

    // TODO consider updating load whenever queue changes
    private void updateLoad() {
        underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
    }

    /**
     * Feedback policy that delays the caller's thread until the executor's work
     * queue falls below a threshold, then runs the job on the caller's thread.
     */
    @java.lang.SuppressWarnings("squid:S1217") // We really do mean to call run()
    private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {

        private final BlockingBoolean underLoad = new BlockingBoolean(false);

        public BlockingBoolean load() {
            return underLoad;
        }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // Wait for up to 1 second while the queue drains...
                boolean notified = false;
                try {
                    notified = underLoad.await(false, 1, TimeUnit.SECONDS);
                } catch (InterruptedException exception) {
                    log.debug("Got exception waiting for notification:", exception);
                } finally {
                    if (!notified) {
                        log.info("Waited for 1 second on {}. Proceeding with work...",
                                 Thread.currentThread().getName());
                    } else {
                        log.info("FIXME we got a notice");
                    }
                }
                // Do the work on the submitter's thread
                r.run();
            }
        }
    }
}
