blob: 9eef6609a0086e1bc00b1fed35ab0d0458b873e3 [file] [log] [blame]
Brian O'Connorc6713a82015-02-24 11:55:48 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import org.slf4j.LoggerFactory;
19
20import java.util.concurrent.Callable;
21import java.util.concurrent.Future;
22import java.util.concurrent.LinkedBlockingQueue;
23import java.util.concurrent.RejectedExecutionHandler;
24import java.util.concurrent.ThreadFactory;
25import java.util.concurrent.ThreadPoolExecutor;
26import java.util.concurrent.TimeUnit;
27import java.util.concurrent.atomic.AtomicLong;
28
29/**
30 * Implementation of ThreadPoolExecutor that bounds the work queue.
31 * <p>
32 * When a new job would exceed the queue bound, the job is run on the caller's
33 * thread rather than on a thread from the pool.
34 * </p>
35 */
36public final class BoundedThreadPool extends ThreadPoolExecutor {
37
38 private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
39
40 protected static int maxQueueSize = 80_000; //TODO tune this value
41 //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
42 private static final long STATS_INTERVAL = 5_000; //ms
43
44 private final BlockingBoolean underHighLoad;
45
46 private BoundedThreadPool(int numberOfThreads,
47 ThreadFactory threadFactory) {
48 super(numberOfThreads, numberOfThreads,
49 0L, TimeUnit.MILLISECONDS,
50 new LinkedBlockingQueue<>(maxQueueSize),
51 threadFactory,
52 new CallerFeedbackPolicy());
53 underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
54 }
55
56 /**
57 * Returns a single-thread, bounded executor service.
58 *
59 * @param threadFactory thread factory for the worker thread.
60 * @return the bounded thread pool
61 */
62 public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
63 return new BoundedThreadPool(1, threadFactory);
64 }
65
66 /**
67 * Returns a fixed-size, bounded executor service.
68 *
Thomas Vachuska3e2b6512015-03-05 09:25:03 -080069 * @param numberOfThreads number of threads in the pool
70 * @param threadFactory thread factory for the worker threads.
Brian O'Connorc6713a82015-02-24 11:55:48 -080071 * @return the bounded thread pool
72 */
73 public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
74 return new BoundedThreadPool(numberOfThreads, threadFactory);
75 }
76
77 //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
78 private final Counter submitted = new Counter();
79 private final Counter taken = new Counter();
80
81 @Override
82 public Future<?> submit(Runnable task) {
83 submitted.add(1);
84 return super.submit(task);
85 }
86
87 @Override
88 public <T> Future<T> submit(Runnable task, T result) {
89 submitted.add(1);
90 return super.submit(task, result);
91 }
92
93 @Override
94 public void execute(Runnable command) {
95 submitted.add(1);
96 super.execute(command);
97 }
98
99 @Override
100 public <T> Future<T> submit(Callable<T> task) {
101 submitted.add(1);
102 return super.submit(task);
103 }
104
105
106 @Override
107 protected void beforeExecute(Thread t, Runnable r) {
108 super.beforeExecute(t, r);
109 taken.add(1);
110 periodicallyPrintStats();
111 updateLoad();
112 }
113
114 // TODO schedule this with a fixed delay from a scheduled executor
115 private final AtomicLong lastPrinted = new AtomicLong(0L);
Thomas Vachuska3e2b6512015-03-05 09:25:03 -0800116
Brian O'Connorc6713a82015-02-24 11:55:48 -0800117 private void periodicallyPrintStats() {
118 long now = System.currentTimeMillis();
119 long prev = lastPrinted.get();
120 if (now - prev > STATS_INTERVAL) {
121 if (lastPrinted.compareAndSet(prev, now)) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700122 log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
Brian O'Connorc6713a82015-02-24 11:55:48 -0800123 getQueue().size(),
124 submitted.throughput(), taken.throughput());
125 submitted.reset();
126 taken.reset();
127 }
128 }
129 }
130
131 // TODO consider updating load whenever queue changes
132 private void updateLoad() {
133 underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
134 }
135
136 /**
137 * Feedback policy that delays the caller's thread until the executor's work
138 * queue falls below a threshold, then runs the job on the caller's thread.
139 */
140 private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
141
142 private final BlockingBoolean underLoad = new BlockingBoolean(false);
143
144 public BlockingBoolean load() {
145 return underLoad;
146 }
147
148 /**
149 * Executes task r in the caller's thread, unless the executor
150 * has been shut down, in which case the task is discarded.
151 *
152 * @param r the runnable task requested to be executed
153 * @param e the executor attempting to execute this task
154 */
155 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
156 if (!e.isShutdown()) {
157 // Wait for up to 1 second while the queue drains...
158 boolean notified = false;
159 try {
160 notified = underLoad.await(false, 1, TimeUnit.SECONDS);
161 } catch (InterruptedException exception) {
162 log.debug("Got exception waiting for notification:", exception);
163 } finally {
164 if (!notified) {
165 log.info("Waited for 1 second on {}. Proceeding with work...",
166 Thread.currentThread().getName());
167 } else {
168 log.info("FIXME we got a notice");
169 }
170 }
171 // Do the work on the submitter's thread
172 r.run();
173 }
174 }
175 }
176}