blob: de5e1bd1fb91355fd110b2ffbd49eb63d6188c9b [file] [log] [blame]
Brian O'Connorc6713a82015-02-24 11:55:48 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Brian O'Connorc6713a82015-02-24 11:55:48 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Brian O'Connorc6713a82015-02-24 11:55:48 -080018import java.util.concurrent.Callable;
Jian Lie0ae3fb2016-03-02 01:05:23 -080019import java.util.concurrent.CancellationException;
20import java.util.concurrent.ExecutionException;
Brian O'Connorc6713a82015-02-24 11:55:48 -080021import java.util.concurrent.Future;
22import java.util.concurrent.LinkedBlockingQueue;
23import java.util.concurrent.RejectedExecutionHandler;
24import java.util.concurrent.ThreadFactory;
25import java.util.concurrent.ThreadPoolExecutor;
26import java.util.concurrent.TimeUnit;
27import java.util.concurrent.atomic.AtomicLong;
28
Ray Milkey9f87e512016-01-05 10:00:22 -080029import org.slf4j.LoggerFactory;
30
Brian O'Connorc6713a82015-02-24 11:55:48 -080031/**
32 * Implementation of ThreadPoolExecutor that bounds the work queue.
33 * <p>
34 * When a new job would exceed the queue bound, the job is run on the caller's
35 * thread rather than on a thread from the pool.
36 * </p>
37 */
38public final class BoundedThreadPool extends ThreadPoolExecutor {
39
40 private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
41
Ray Milkey9c9cde42018-01-12 14:22:06 -080042 static int maxQueueSize = 80_000; //TODO tune this value
Brian O'Connorc6713a82015-02-24 11:55:48 -080043 //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
44 private static final long STATS_INTERVAL = 5_000; //ms
45
46 private final BlockingBoolean underHighLoad;
47
48 private BoundedThreadPool(int numberOfThreads,
49 ThreadFactory threadFactory) {
50 super(numberOfThreads, numberOfThreads,
51 0L, TimeUnit.MILLISECONDS,
52 new LinkedBlockingQueue<>(maxQueueSize),
53 threadFactory,
54 new CallerFeedbackPolicy());
55 underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
56 }
57
58 /**
59 * Returns a single-thread, bounded executor service.
60 *
61 * @param threadFactory thread factory for the worker thread.
62 * @return the bounded thread pool
63 */
64 public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
65 return new BoundedThreadPool(1, threadFactory);
66 }
67
68 /**
69 * Returns a fixed-size, bounded executor service.
70 *
Thomas Vachuska3e2b6512015-03-05 09:25:03 -080071 * @param numberOfThreads number of threads in the pool
72 * @param threadFactory thread factory for the worker threads.
Brian O'Connorc6713a82015-02-24 11:55:48 -080073 * @return the bounded thread pool
74 */
75 public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
76 return new BoundedThreadPool(numberOfThreads, threadFactory);
77 }
78
79 //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
80 private final Counter submitted = new Counter();
81 private final Counter taken = new Counter();
82
83 @Override
84 public Future<?> submit(Runnable task) {
85 submitted.add(1);
86 return super.submit(task);
87 }
88
89 @Override
90 public <T> Future<T> submit(Runnable task, T result) {
91 submitted.add(1);
92 return super.submit(task, result);
93 }
94
95 @Override
96 public void execute(Runnable command) {
97 submitted.add(1);
98 super.execute(command);
99 }
100
101 @Override
102 public <T> Future<T> submit(Callable<T> task) {
103 submitted.add(1);
104 return super.submit(task);
105 }
106
107
108 @Override
109 protected void beforeExecute(Thread t, Runnable r) {
110 super.beforeExecute(t, r);
111 taken.add(1);
112 periodicallyPrintStats();
113 updateLoad();
114 }
115
Jian Lie0ae3fb2016-03-02 01:05:23 -0800116 @Override
117 protected void afterExecute(Runnable r, Throwable t) {
118 super.afterExecute(r, t);
119 if (t == null && r instanceof Future<?>) {
120 try {
121 Future<?> future = (Future<?>) r;
122 if (future.isDone()) {
123 future.get();
124 }
125 } catch (CancellationException ce) {
126 t = ce;
127 } catch (ExecutionException ee) {
128 t = ee.getCause();
129 } catch (InterruptedException ie) {
130 Thread.currentThread().interrupt();
131 }
132 }
133 if (t != null) {
134 log.error("Uncaught exception on " + r.getClass().getSimpleName(), t);
135 }
136 }
137
Brian O'Connorc6713a82015-02-24 11:55:48 -0800138 // TODO schedule this with a fixed delay from a scheduled executor
139 private final AtomicLong lastPrinted = new AtomicLong(0L);
Thomas Vachuska3e2b6512015-03-05 09:25:03 -0800140
Brian O'Connorc6713a82015-02-24 11:55:48 -0800141 private void periodicallyPrintStats() {
142 long now = System.currentTimeMillis();
143 long prev = lastPrinted.get();
144 if (now - prev > STATS_INTERVAL) {
145 if (lastPrinted.compareAndSet(prev, now)) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700146 log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
Brian O'Connorc6713a82015-02-24 11:55:48 -0800147 getQueue().size(),
148 submitted.throughput(), taken.throughput());
149 submitted.reset();
150 taken.reset();
151 }
152 }
153 }
154
155 // TODO consider updating load whenever queue changes
156 private void updateLoad() {
157 underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
158 }
159
160 /**
161 * Feedback policy that delays the caller's thread until the executor's work
162 * queue falls below a threshold, then runs the job on the caller's thread.
163 */
Ray Milkeyaef45852016-01-11 17:13:19 -0800164 @java.lang.SuppressWarnings("squid:S1217") // We really do mean to call run()
Brian O'Connorc6713a82015-02-24 11:55:48 -0800165 private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
166
167 private final BlockingBoolean underLoad = new BlockingBoolean(false);
168
169 public BlockingBoolean load() {
170 return underLoad;
171 }
172
173 /**
174 * Executes task r in the caller's thread, unless the executor
175 * has been shut down, in which case the task is discarded.
176 *
177 * @param r the runnable task requested to be executed
178 * @param e the executor attempting to execute this task
179 */
180 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
181 if (!e.isShutdown()) {
182 // Wait for up to 1 second while the queue drains...
183 boolean notified = false;
184 try {
185 notified = underLoad.await(false, 1, TimeUnit.SECONDS);
186 } catch (InterruptedException exception) {
187 log.debug("Got exception waiting for notification:", exception);
188 } finally {
189 if (!notified) {
190 log.info("Waited for 1 second on {}. Proceeding with work...",
191 Thread.currentThread().getName());
192 } else {
193 log.info("FIXME we got a notice");
194 }
195 }
196 // Do the work on the submitter's thread
197 r.run();
198 }
199 }
200 }
Ray Milkey676249c2015-12-18 09:27:03 -0800201}