blob: 5ca88c2de087e99efbf9656a3b2eb5db37aeb7fd [file] [log] [blame]
Brian O'Connorc6713a82015-02-24 11:55:48 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Brian O'Connorc6713a82015-02-24 11:55:48 -080018import java.util.concurrent.Callable;
19import java.util.concurrent.Future;
20import java.util.concurrent.LinkedBlockingQueue;
21import java.util.concurrent.RejectedExecutionHandler;
22import java.util.concurrent.ThreadFactory;
23import java.util.concurrent.ThreadPoolExecutor;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.atomic.AtomicLong;
26
Ray Milkey9f87e512016-01-05 10:00:22 -080027import org.slf4j.LoggerFactory;
28
Brian O'Connorc6713a82015-02-24 11:55:48 -080029/**
30 * Implementation of ThreadPoolExecutor that bounds the work queue.
31 * <p>
32 * When a new job would exceed the queue bound, the job is run on the caller's
33 * thread rather than on a thread from the pool.
34 * </p>
35 */
36public final class BoundedThreadPool extends ThreadPoolExecutor {
37
38 private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
39
40 protected static int maxQueueSize = 80_000; //TODO tune this value
41 //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
42 private static final long STATS_INTERVAL = 5_000; //ms
43
44 private final BlockingBoolean underHighLoad;
45
46 private BoundedThreadPool(int numberOfThreads,
47 ThreadFactory threadFactory) {
48 super(numberOfThreads, numberOfThreads,
49 0L, TimeUnit.MILLISECONDS,
50 new LinkedBlockingQueue<>(maxQueueSize),
51 threadFactory,
52 new CallerFeedbackPolicy());
53 underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
54 }
55
56 /**
57 * Returns a single-thread, bounded executor service.
58 *
59 * @param threadFactory thread factory for the worker thread.
60 * @return the bounded thread pool
61 */
62 public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
63 return new BoundedThreadPool(1, threadFactory);
64 }
65
66 /**
67 * Returns a fixed-size, bounded executor service.
68 *
Thomas Vachuska3e2b6512015-03-05 09:25:03 -080069 * @param numberOfThreads number of threads in the pool
70 * @param threadFactory thread factory for the worker threads.
Brian O'Connorc6713a82015-02-24 11:55:48 -080071 * @return the bounded thread pool
72 */
73 public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
74 return new BoundedThreadPool(numberOfThreads, threadFactory);
75 }
76
77 //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
78 private final Counter submitted = new Counter();
79 private final Counter taken = new Counter();
80
81 @Override
82 public Future<?> submit(Runnable task) {
83 submitted.add(1);
84 return super.submit(task);
85 }
86
87 @Override
88 public <T> Future<T> submit(Runnable task, T result) {
89 submitted.add(1);
90 return super.submit(task, result);
91 }
92
93 @Override
94 public void execute(Runnable command) {
95 submitted.add(1);
96 super.execute(command);
97 }
98
99 @Override
100 public <T> Future<T> submit(Callable<T> task) {
101 submitted.add(1);
102 return super.submit(task);
103 }
104
105
106 @Override
107 protected void beforeExecute(Thread t, Runnable r) {
108 super.beforeExecute(t, r);
109 taken.add(1);
110 periodicallyPrintStats();
111 updateLoad();
112 }
113
114 // TODO schedule this with a fixed delay from a scheduled executor
115 private final AtomicLong lastPrinted = new AtomicLong(0L);
Thomas Vachuska3e2b6512015-03-05 09:25:03 -0800116
Brian O'Connorc6713a82015-02-24 11:55:48 -0800117 private void periodicallyPrintStats() {
118 long now = System.currentTimeMillis();
119 long prev = lastPrinted.get();
120 if (now - prev > STATS_INTERVAL) {
121 if (lastPrinted.compareAndSet(prev, now)) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700122 log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
Brian O'Connorc6713a82015-02-24 11:55:48 -0800123 getQueue().size(),
124 submitted.throughput(), taken.throughput());
125 submitted.reset();
126 taken.reset();
127 }
128 }
129 }
130
131 // TODO consider updating load whenever queue changes
132 private void updateLoad() {
133 underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
134 }
135
136 /**
137 * Feedback policy that delays the caller's thread until the executor's work
138 * queue falls below a threshold, then runs the job on the caller's thread.
139 */
Ray Milkeyaef45852016-01-11 17:13:19 -0800140 @java.lang.SuppressWarnings("squid:S1217") // We really do mean to call run()
Brian O'Connorc6713a82015-02-24 11:55:48 -0800141 private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
142
143 private final BlockingBoolean underLoad = new BlockingBoolean(false);
144
145 public BlockingBoolean load() {
146 return underLoad;
147 }
148
149 /**
150 * Executes task r in the caller's thread, unless the executor
151 * has been shut down, in which case the task is discarded.
152 *
153 * @param r the runnable task requested to be executed
154 * @param e the executor attempting to execute this task
155 */
156 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
157 if (!e.isShutdown()) {
158 // Wait for up to 1 second while the queue drains...
159 boolean notified = false;
160 try {
161 notified = underLoad.await(false, 1, TimeUnit.SECONDS);
162 } catch (InterruptedException exception) {
163 log.debug("Got exception waiting for notification:", exception);
164 } finally {
165 if (!notified) {
166 log.info("Waited for 1 second on {}. Proceeding with work...",
167 Thread.currentThread().getName());
168 } else {
169 log.info("FIXME we got a notice");
170 }
171 }
172 // Do the work on the submitter's thread
173 r.run();
174 }
175 }
176 }
Ray Milkey676249c2015-12-18 09:27:03 -0800177}