blob: 5ca88c2de087e99efbf9656a3b2eb5db37aeb7fd [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.LoggerFactory;
/**
* Implementation of ThreadPoolExecutor that bounds the work queue.
* <p>
* When a new job would exceed the queue bound, the job is run on the caller's
* thread rather than on a thread from the pool.
* </p>
*/
public final class BoundedThreadPool extends ThreadPoolExecutor {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
protected static int maxQueueSize = 80_000; //TODO tune this value
//private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
private static final long STATS_INTERVAL = 5_000; //ms
private final BlockingBoolean underHighLoad;
private BoundedThreadPool(int numberOfThreads,
ThreadFactory threadFactory) {
super(numberOfThreads, numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(maxQueueSize),
threadFactory,
new CallerFeedbackPolicy());
underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
}
/**
* Returns a single-thread, bounded executor service.
*
* @param threadFactory thread factory for the worker thread.
* @return the bounded thread pool
*/
public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
return new BoundedThreadPool(1, threadFactory);
}
/**
* Returns a fixed-size, bounded executor service.
*
* @param numberOfThreads number of threads in the pool
* @param threadFactory thread factory for the worker threads.
* @return the bounded thread pool
*/
public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
return new BoundedThreadPool(numberOfThreads, threadFactory);
}
//TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
private final Counter submitted = new Counter();
private final Counter taken = new Counter();
@Override
public Future<?> submit(Runnable task) {
submitted.add(1);
return super.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
submitted.add(1);
return super.submit(task, result);
}
@Override
public void execute(Runnable command) {
submitted.add(1);
super.execute(command);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
submitted.add(1);
return super.submit(task);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
taken.add(1);
periodicallyPrintStats();
updateLoad();
}
// TODO schedule this with a fixed delay from a scheduled executor
private final AtomicLong lastPrinted = new AtomicLong(0L);
private void periodicallyPrintStats() {
long now = System.currentTimeMillis();
long prev = lastPrinted.get();
if (now - prev > STATS_INTERVAL) {
if (lastPrinted.compareAndSet(prev, now)) {
log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
getQueue().size(),
submitted.throughput(), taken.throughput());
submitted.reset();
taken.reset();
}
}
}
// TODO consider updating load whenever queue changes
private void updateLoad() {
underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
}
/**
* Feedback policy that delays the caller's thread until the executor's work
* queue falls below a threshold, then runs the job on the caller's thread.
*/
@java.lang.SuppressWarnings("squid:S1217") // We really do mean to call run()
private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
private final BlockingBoolean underLoad = new BlockingBoolean(false);
public BlockingBoolean load() {
return underLoad;
}
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// Wait for up to 1 second while the queue drains...
boolean notified = false;
try {
notified = underLoad.await(false, 1, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
log.debug("Got exception waiting for notification:", exception);
} finally {
if (!notified) {
log.info("Waited for 1 second on {}. Proceeding with work...",
Thread.currentThread().getName());
} else {
log.info("FIXME we got a notice");
}
}
// Do the work on the submitter's thread
r.run();
}
}
}
}