blob: 6fb1c900fa03489fdb7283565fa5bd3af3fa5d27 [file] [log] [blame]
Brian O'Connorc6713a82015-02-24 11:55:48 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import org.slf4j.LoggerFactory;
19
20import java.util.concurrent.Callable;
21import java.util.concurrent.Future;
22import java.util.concurrent.LinkedBlockingQueue;
23import java.util.concurrent.RejectedExecutionHandler;
24import java.util.concurrent.ThreadFactory;
25import java.util.concurrent.ThreadPoolExecutor;
26import java.util.concurrent.TimeUnit;
27import java.util.concurrent.atomic.AtomicLong;
28
29/**
30 * Implementation of ThreadPoolExecutor that bounds the work queue.
31 * <p>
32 * When a new job would exceed the queue bound, the job is run on the caller's
33 * thread rather than on a thread from the pool.
34 * </p>
35 */
36public final class BoundedThreadPool extends ThreadPoolExecutor {
37
38 private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
39
40 protected static int maxQueueSize = 80_000; //TODO tune this value
41 //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
42 private static final long STATS_INTERVAL = 5_000; //ms
43
44 private final BlockingBoolean underHighLoad;
45
46 private BoundedThreadPool(int numberOfThreads,
47 ThreadFactory threadFactory) {
48 super(numberOfThreads, numberOfThreads,
49 0L, TimeUnit.MILLISECONDS,
50 new LinkedBlockingQueue<>(maxQueueSize),
51 threadFactory,
52 new CallerFeedbackPolicy());
53 underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
54 }
55
56 /**
57 * Returns a single-thread, bounded executor service.
58 *
59 * @param threadFactory thread factory for the worker thread.
60 * @return the bounded thread pool
61 */
62 public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
63 return new BoundedThreadPool(1, threadFactory);
64 }
65
66 /**
67 * Returns a fixed-size, bounded executor service.
68 *
69 * @param threadFactory thread factory for the worker threads.
70 * @return the bounded thread pool
71 */
72 public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
73 return new BoundedThreadPool(numberOfThreads, threadFactory);
74 }
75
76 //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
77 private final Counter submitted = new Counter();
78 private final Counter taken = new Counter();
79
80 @Override
81 public Future<?> submit(Runnable task) {
82 submitted.add(1);
83 return super.submit(task);
84 }
85
86 @Override
87 public <T> Future<T> submit(Runnable task, T result) {
88 submitted.add(1);
89 return super.submit(task, result);
90 }
91
92 @Override
93 public void execute(Runnable command) {
94 submitted.add(1);
95 super.execute(command);
96 }
97
98 @Override
99 public <T> Future<T> submit(Callable<T> task) {
100 submitted.add(1);
101 return super.submit(task);
102 }
103
104
105 @Override
106 protected void beforeExecute(Thread t, Runnable r) {
107 super.beforeExecute(t, r);
108 taken.add(1);
109 periodicallyPrintStats();
110 updateLoad();
111 }
112
113 // TODO schedule this with a fixed delay from a scheduled executor
114 private final AtomicLong lastPrinted = new AtomicLong(0L);
115 private void periodicallyPrintStats() {
116 long now = System.currentTimeMillis();
117 long prev = lastPrinted.get();
118 if (now - prev > STATS_INTERVAL) {
119 if (lastPrinted.compareAndSet(prev, now)) {
120 log.warn("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
121 getQueue().size(),
122 submitted.throughput(), taken.throughput());
123 submitted.reset();
124 taken.reset();
125 }
126 }
127 }
128
129 // TODO consider updating load whenever queue changes
130 private void updateLoad() {
131 underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
132 }
133
134 /**
135 * Feedback policy that delays the caller's thread until the executor's work
136 * queue falls below a threshold, then runs the job on the caller's thread.
137 */
138 private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
139
140 private final BlockingBoolean underLoad = new BlockingBoolean(false);
141
142 public BlockingBoolean load() {
143 return underLoad;
144 }
145
146 /**
147 * Executes task r in the caller's thread, unless the executor
148 * has been shut down, in which case the task is discarded.
149 *
150 * @param r the runnable task requested to be executed
151 * @param e the executor attempting to execute this task
152 */
153 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
154 if (!e.isShutdown()) {
155 // Wait for up to 1 second while the queue drains...
156 boolean notified = false;
157 try {
158 notified = underLoad.await(false, 1, TimeUnit.SECONDS);
159 } catch (InterruptedException exception) {
160 log.debug("Got exception waiting for notification:", exception);
161 } finally {
162 if (!notified) {
163 log.info("Waited for 1 second on {}. Proceeding with work...",
164 Thread.currentThread().getName());
165 } else {
166 log.info("FIXME we got a notice");
167 }
168 }
169 // Do the work on the submitter's thread
170 r.run();
171 }
172 }
173 }
174}