blob: 2be1ee2ffdcbcf7d4f526719dd127cb0a7af1d21 [file] [log] [blame]
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static com.google.common.base.Preconditions.checkNotNull;
20
21import java.time.Duration;
22import java.time.Instant;
23import java.time.temporal.ChronoUnit;
24import java.util.ArrayList;
25import java.util.List;
26import java.util.Objects;
27import java.util.concurrent.AbstractExecutorService;
28import java.util.concurrent.Callable;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.FutureTask;
32import java.util.concurrent.ThreadFactory;
33import java.util.concurrent.TimeUnit;
34import java.util.function.Function;
35import java.util.stream.Collectors;
36
37/**
38 * (Somewhat) predictable ExecutorService.
39 * <p>
40 * ExecutorService which behaves similar to the one created by
41 * {@link Executors#newFixedThreadPool(int, ThreadFactory)},
42 * but assigns command to specific thread based on
43 * it's {@link PickyTask#hint()}, {@link Object#hashCode()}, or hint value explicitly
44 * specified when the command was passed to this {@link PredictableExecutor}.
45 */
46public class PredictableExecutor
47 extends AbstractExecutorService
48 implements ExecutorService {
49
50 private final List<ExecutorService> backends;
51
52 /**
53 * Creates {@link PredictableExecutor} instance.
54 *
55 * @param buckets number of buckets or 0 to match available processors
56 * @param threadFactory {@link ThreadFactory} to use to create threads
57 * @return {@link PredictableExecutor}
58 */
59 public static PredictableExecutor newPredictableExecutor(int buckets, ThreadFactory threadFactory) {
60 return new PredictableExecutor(buckets, threadFactory);
61 }
62
63 /**
64 * Creates {@link PredictableExecutor} instance.
65 *
66 * @param buckets number of buckets or 0 to match available processors
67 * @param threadFactory {@link ThreadFactory} to use to create threads
68 */
69 public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
70 checkArgument(buckets >= 0, "number of buckets must be non zero");
71 checkNotNull(threadFactory);
72 if (buckets == 0) {
73 buckets = Runtime.getRuntime().availableProcessors();
74 }
75 this.backends = new ArrayList<>(buckets);
76
77 for (int i = 0; i < buckets; ++i) {
78 this.backends.add(backendExecutorService(threadFactory));
79 }
80 }
81
82 /**
83 * Creates {@link PredictableExecutor} instance with
84 * bucket size set to number of available processors.
85 *
86 * @param threadFactory {@link ThreadFactory} to use to create threads
87 */
88 public PredictableExecutor(ThreadFactory threadFactory) {
89 this(0, threadFactory);
90 }
91
92 /**
93 * Creates a single thread {@link ExecutorService} to use in the backend.
94 *
95 * @param threadFactory {@link ThreadFactory} to use to create threads
96 * @return single thread {@link ExecutorService}
97 */
98 protected ExecutorService backendExecutorService(ThreadFactory threadFactory) {
99 return Executors.newSingleThreadExecutor(threadFactory);
100 }
101
102
103 /**
104 * Executes given command at some time in the future.
105 *
106 * @param command the {@link Runnable} task
107 * @param hint value to pick thread to run on.
108 */
109 public void execute(Runnable command, int hint) {
110 int index = Math.abs(hint) % backends.size();
111 backends.get(index).execute(command);
112 }
113
114 /**
115 * Executes given command at some time in the future.
116 *
117 * @param command the {@link Runnable} task
118 * @param hintFunction Function to compute hint value
119 */
120 public void execute(Runnable command, Function<Runnable, Integer> hintFunction) {
121 execute(command, hintFunction.apply(command));
122 }
123
124
125 private static int hint(Runnable command) {
126 if (command instanceof PickyTask) {
127 return ((PickyTask) command).hint();
128 } else {
129 return Objects.hashCode(command);
130 }
131 }
132
133 @Override
134 public void execute(Runnable command) {
135 execute(command, PredictableExecutor::hint);
136 }
137
138 @Override
139 public void shutdown() {
Sho SHIMIZU8ebb04a2016-10-06 15:58:29 -0700140 backends.forEach(ExecutorService::shutdown);
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700141 }
142
143 @Override
144 public List<Runnable> shutdownNow() {
145 return backends.stream()
146 .map(ExecutorService::shutdownNow)
147 .flatMap(List::stream)
148 .collect(Collectors.toList());
149 }
150
151 @Override
152 public boolean isShutdown() {
153 return backends.stream().allMatch(ExecutorService::isShutdown);
154 }
155
156 @Override
157 public boolean isTerminated() {
158 return backends.stream().allMatch(ExecutorService::isTerminated);
159 }
160
161 /**
162 * {@inheritDoc}
163 * <p>
164 * Note: It'll try, but is not assured that the method will return by specified timeout.
165 */
166 @Override
167 public boolean awaitTermination(long timeout, TimeUnit unit)
168 throws InterruptedException {
169
170 final Duration timeoutD = Duration.of(unit.toMillis(timeout), ChronoUnit.MILLIS);
171 final Instant start = Instant.now();
172
173 return backends.parallelStream()
174 .filter(es -> !es.isTerminated())
175 .map(es -> {
176 long timeoutMs = timeoutD.minus(Duration.between(Instant.now(), start)).toMillis();
177 try {
178 return es.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
179 } catch (InterruptedException e) {
180 Thread.currentThread().interrupt();
181 return false;
182 }
183 })
184 .allMatch(result -> result);
185 }
186
187 @Override
188 protected <T> PickyFutureTask<T> newTaskFor(Callable<T> callable) {
189 return new PickyFutureTask<>(callable);
190 }
191
192 @Override
193 protected <T> PickyFutureTask<T> newTaskFor(Runnable runnable, T value) {
194 return new PickyFutureTask<>(runnable, value);
195 }
196
197 /**
198 * {@link Runnable} also implementing {@link PickyTask}.
199 */
200 public static interface PickyRunnable extends PickyTask, Runnable { }
201
202 /**
203 * {@link Callable} also implementing {@link PickyTask}.
204 *
205 * @param <T> result type
206 */
207 public static interface PickyCallable<T> extends PickyTask, Callable<T> { }
208
209 /**
210 * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
211 *
212 * @param runnable {@link Runnable}
213 * @param hint hint value
214 * @return {@link PickyRunnable}
215 */
216 public static PickyRunnable picky(Runnable runnable, int hint) {
217 return picky(runnable, (r) -> hint);
218 }
219
220 /**
221 * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
222 *
223 * @param runnable {@link Runnable}
224 * @param hint hint function
225 * @return {@link PickyRunnable}
226 */
227 public static PickyRunnable picky(Runnable runnable, Function<Runnable, Integer> hint) {
228 checkNotNull(runnable);
229 checkNotNull(hint);
230 return new PickyRunnable() {
231
232 @Override
233 public void run() {
234 runnable.run();
235 }
236
237 @Override
238 public int hint() {
239 return hint.apply(runnable);
240 }
241 };
242 }
243
244 /**
245 * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
246 *
247 * @param callable {@link Callable}
248 * @param hint hint value
Ray Milkeyef794342016-11-09 16:20:29 -0800249 * @param <T> entity type
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700250 * @return {@link PickyCallable}
251 */
252 public static <T> PickyCallable<T> picky(Callable<T> callable, int hint) {
253 return picky(callable, (c) -> hint);
254 }
255
256 /**
257 * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
258 *
259 * @param callable {@link Callable}
260 * @param hint hint function
Ray Milkeyef794342016-11-09 16:20:29 -0800261 * @param <T> entity type
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700262 * @return {@link PickyCallable}
263 */
264 public static <T> PickyCallable<T> picky(Callable<T> callable, Function<Callable<T>, Integer> hint) {
265 checkNotNull(callable);
266 checkNotNull(hint);
267 return new PickyCallable<T>() {
268
269 @Override
270 public T call() throws Exception {
271 return callable.call();
272 }
273
274 @Override
275 public int hint() {
276 return hint.apply(callable);
277 }
278
279 };
280 }
281
282 /**
283 * Abstraction to give a task a way to express it's preference to run on
284 * certain thread.
285 */
286 public static interface PickyTask {
287
288 /**
289 * Returns hint for choosing which Thread to run this task on.
290 *
291 * @return hint value
292 */
293 int hint();
294 }
295
296 /**
297 * A {@link FutureTask} implementing {@link PickyTask}.
298 * <p>
299 * Note: if the wrapped {@link Callable} or {@link Runnable} was an instance of
300 * {@link PickyTask}, it will use {@link PickyTask#hint()} value, if not use {@link Object#hashCode()}.
301 *
302 * @param <T> result type.
303 */
304 public static class PickyFutureTask<T>
305 extends FutureTask<T>
306 implements PickyTask {
307
308 private final Object runnableOrCallable;
309
310 /**
311 * Same as {@link FutureTask#FutureTask(Runnable, Object)}.
Ray Milkeyef794342016-11-09 16:20:29 -0800312 *
313 * @param runnable work to do
314 * @param value result
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700315 */
316 public PickyFutureTask(Runnable runnable, T value) {
317 super(runnable, value);
318 runnableOrCallable = checkNotNull(runnable);
319 }
320
321 /**
322 * Same as {@link FutureTask#FutureTask(Callable)}.
Ray Milkeyef794342016-11-09 16:20:29 -0800323 *
324 * @param callable work to be done
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700325 */
326 public PickyFutureTask(Callable<T> callable) {
327 super(callable);
328 runnableOrCallable = checkNotNull(callable);
329 }
330
331 @Override
332 public int hint() {
333 if (runnableOrCallable instanceof PickyTask) {
334 return ((PickyTask) runnableOrCallable).hint();
335 } else {
336 return runnableOrCallable.hashCode();
337 }
338 }
339 }
340}