blob: 627f1416ed54c5e9b1ec979a6d9339f2cf117f47 [file] [log] [blame]
Brian O'Connorc6713a82015-02-24 11:55:48 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Brian O'Connorc6713a82015-02-24 11:55:48 -080018import java.util.concurrent.Callable;
19import java.util.concurrent.Future;
20import java.util.concurrent.LinkedBlockingQueue;
21import java.util.concurrent.RejectedExecutionHandler;
22import java.util.concurrent.ThreadFactory;
23import java.util.concurrent.ThreadPoolExecutor;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.atomic.AtomicLong;
26
Ray Milkey9f87e512016-01-05 10:00:22 -080027import org.slf4j.LoggerFactory;
28
29import static org.onlab.util.SonarSuppressionConstants.SONAR_CALL_RUN;
30
Brian O'Connorc6713a82015-02-24 11:55:48 -080031/**
32 * Implementation of ThreadPoolExecutor that bounds the work queue.
33 * <p>
34 * When a new job would exceed the queue bound, the job is run on the caller's
35 * thread rather than on a thread from the pool.
36 * </p>
37 */
38public final class BoundedThreadPool extends ThreadPoolExecutor {
39
40 private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
41
42 protected static int maxQueueSize = 80_000; //TODO tune this value
43 //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
44 private static final long STATS_INTERVAL = 5_000; //ms
45
46 private final BlockingBoolean underHighLoad;
47
48 private BoundedThreadPool(int numberOfThreads,
49 ThreadFactory threadFactory) {
50 super(numberOfThreads, numberOfThreads,
51 0L, TimeUnit.MILLISECONDS,
52 new LinkedBlockingQueue<>(maxQueueSize),
53 threadFactory,
54 new CallerFeedbackPolicy());
55 underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
56 }
57
58 /**
59 * Returns a single-thread, bounded executor service.
60 *
61 * @param threadFactory thread factory for the worker thread.
62 * @return the bounded thread pool
63 */
64 public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
65 return new BoundedThreadPool(1, threadFactory);
66 }
67
68 /**
69 * Returns a fixed-size, bounded executor service.
70 *
Thomas Vachuska3e2b6512015-03-05 09:25:03 -080071 * @param numberOfThreads number of threads in the pool
72 * @param threadFactory thread factory for the worker threads.
Brian O'Connorc6713a82015-02-24 11:55:48 -080073 * @return the bounded thread pool
74 */
75 public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
76 return new BoundedThreadPool(numberOfThreads, threadFactory);
77 }
78
79 //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
80 private final Counter submitted = new Counter();
81 private final Counter taken = new Counter();
82
83 @Override
84 public Future<?> submit(Runnable task) {
85 submitted.add(1);
86 return super.submit(task);
87 }
88
89 @Override
90 public <T> Future<T> submit(Runnable task, T result) {
91 submitted.add(1);
92 return super.submit(task, result);
93 }
94
95 @Override
96 public void execute(Runnable command) {
97 submitted.add(1);
98 super.execute(command);
99 }
100
101 @Override
102 public <T> Future<T> submit(Callable<T> task) {
103 submitted.add(1);
104 return super.submit(task);
105 }
106
107
108 @Override
109 protected void beforeExecute(Thread t, Runnable r) {
110 super.beforeExecute(t, r);
111 taken.add(1);
112 periodicallyPrintStats();
113 updateLoad();
114 }
115
116 // TODO schedule this with a fixed delay from a scheduled executor
117 private final AtomicLong lastPrinted = new AtomicLong(0L);
Thomas Vachuska3e2b6512015-03-05 09:25:03 -0800118
Brian O'Connorc6713a82015-02-24 11:55:48 -0800119 private void periodicallyPrintStats() {
120 long now = System.currentTimeMillis();
121 long prev = lastPrinted.get();
122 if (now - prev > STATS_INTERVAL) {
123 if (lastPrinted.compareAndSet(prev, now)) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700124 log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
Brian O'Connorc6713a82015-02-24 11:55:48 -0800125 getQueue().size(),
126 submitted.throughput(), taken.throughput());
127 submitted.reset();
128 taken.reset();
129 }
130 }
131 }
132
133 // TODO consider updating load whenever queue changes
134 private void updateLoad() {
135 underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
136 }
137
138 /**
139 * Feedback policy that delays the caller's thread until the executor's work
140 * queue falls below a threshold, then runs the job on the caller's thread.
141 */
Ray Milkey9f87e512016-01-05 10:00:22 -0800142 @java.lang.SuppressWarnings(SONAR_CALL_RUN) // We really do mean to call run()
Brian O'Connorc6713a82015-02-24 11:55:48 -0800143 private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
144
145 private final BlockingBoolean underLoad = new BlockingBoolean(false);
146
147 public BlockingBoolean load() {
148 return underLoad;
149 }
150
151 /**
152 * Executes task r in the caller's thread, unless the executor
153 * has been shut down, in which case the task is discarded.
154 *
155 * @param r the runnable task requested to be executed
156 * @param e the executor attempting to execute this task
157 */
158 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
159 if (!e.isShutdown()) {
160 // Wait for up to 1 second while the queue drains...
161 boolean notified = false;
162 try {
163 notified = underLoad.await(false, 1, TimeUnit.SECONDS);
164 } catch (InterruptedException exception) {
165 log.debug("Got exception waiting for notification:", exception);
166 } finally {
167 if (!notified) {
168 log.info("Waited for 1 second on {}. Proceeding with work...",
169 Thread.currentThread().getName());
170 } else {
171 log.info("FIXME we got a notice");
172 }
173 }
174 // Do the work on the submitter's thread
175 r.run();
176 }
177 }
178 }
Ray Milkey676249c2015-12-18 09:27:03 -0800179}