blob: b18a2fbad6181313087e21310025c63e2fc85e65 [file] [log] [blame]
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
pierb84e9192019-05-17 20:47:06 +020018import com.google.common.util.concurrent.MoreExecutors;
19
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -070020import static com.google.common.base.Preconditions.checkArgument;
21import static com.google.common.base.Preconditions.checkNotNull;
22
23import java.time.Duration;
24import java.time.Instant;
25import java.time.temporal.ChronoUnit;
26import java.util.ArrayList;
27import java.util.List;
28import java.util.Objects;
29import java.util.concurrent.AbstractExecutorService;
30import java.util.concurrent.Callable;
Charles Chan9797ebb2020-02-14 13:23:57 -080031import java.util.concurrent.CompletableFuture;
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -070032import java.util.concurrent.ExecutorService;
33import java.util.concurrent.Executors;
34import java.util.concurrent.FutureTask;
35import java.util.concurrent.ThreadFactory;
36import java.util.concurrent.TimeUnit;
37import java.util.function.Function;
38import java.util.stream.Collectors;
39
40/**
41 * (Somewhat) predictable ExecutorService.
42 * <p>
43 * ExecutorService which behaves similar to the one created by
44 * {@link Executors#newFixedThreadPool(int, ThreadFactory)},
45 * but assigns command to specific thread based on
46 * it's {@link PickyTask#hint()}, {@link Object#hashCode()}, or hint value explicitly
47 * specified when the command was passed to this {@link PredictableExecutor}.
48 */
49public class PredictableExecutor
50 extends AbstractExecutorService
51 implements ExecutorService {
52
53 private final List<ExecutorService> backends;
54
55 /**
56 * Creates {@link PredictableExecutor} instance.
57 *
58 * @param buckets number of buckets or 0 to match available processors
59 * @param threadFactory {@link ThreadFactory} to use to create threads
60 * @return {@link PredictableExecutor}
61 */
62 public static PredictableExecutor newPredictableExecutor(int buckets, ThreadFactory threadFactory) {
63 return new PredictableExecutor(buckets, threadFactory);
64 }
65
66 /**
67 * Creates {@link PredictableExecutor} instance.
68 *
69 * @param buckets number of buckets or 0 to match available processors
70 * @param threadFactory {@link ThreadFactory} to use to create threads
71 */
72 public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
pierb84e9192019-05-17 20:47:06 +020073 this(buckets, threadFactory, false);
74 }
75
76 /**
77 * Creates {@link PredictableExecutor} instance.
78 * Meant for testing purposes.
79 *
80 * @param buckets number of buckets or 0 to match available processors
81 * @param threadFactory {@link ThreadFactory} to use to create threads
82 * @param directExec direct executors
83 */
84 public PredictableExecutor(int buckets, ThreadFactory threadFactory, boolean directExec) {
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -070085 checkArgument(buckets >= 0, "number of buckets must be non zero");
86 checkNotNull(threadFactory);
87 if (buckets == 0) {
88 buckets = Runtime.getRuntime().availableProcessors();
89 }
90 this.backends = new ArrayList<>(buckets);
91
92 for (int i = 0; i < buckets; ++i) {
pierb84e9192019-05-17 20:47:06 +020093 this.backends.add(backendExecutorService(threadFactory, directExec));
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -070094 }
95 }
96
97 /**
98 * Creates {@link PredictableExecutor} instance with
99 * bucket size set to number of available processors.
100 *
101 * @param threadFactory {@link ThreadFactory} to use to create threads
102 */
103 public PredictableExecutor(ThreadFactory threadFactory) {
104 this(0, threadFactory);
105 }
106
107 /**
108 * Creates a single thread {@link ExecutorService} to use in the backend.
109 *
110 * @param threadFactory {@link ThreadFactory} to use to create threads
pierb84e9192019-05-17 20:47:06 +0200111 * @param direct direct executors
112 * @return single thread {@link ExecutorService} or direct executor
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700113 */
pierb84e9192019-05-17 20:47:06 +0200114 protected ExecutorService backendExecutorService(ThreadFactory threadFactory, boolean direct) {
115 return direct ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor(threadFactory);
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700116 }
117
118
119 /**
120 * Executes given command at some time in the future.
121 *
122 * @param command the {@link Runnable} task
123 * @param hint value to pick thread to run on.
124 */
125 public void execute(Runnable command, int hint) {
126 int index = Math.abs(hint) % backends.size();
127 backends.get(index).execute(command);
128 }
129
130 /**
131 * Executes given command at some time in the future.
132 *
133 * @param command the {@link Runnable} task
134 * @param hintFunction Function to compute hint value
135 */
136 public void execute(Runnable command, Function<Runnable, Integer> hintFunction) {
137 execute(command, hintFunction.apply(command));
138 }
139
Charles Chan9797ebb2020-02-14 13:23:57 -0800140 /**
141 * Submits a value-returning task for execution and returns a
142 * Future representing the pending results of the task. The
143 * Future's {@code get} method will return the task's result upon
144 * successful completion.
145 *
146 * @param command the {@link Runnable} task
147 * @param hint value to pick thread to run on.
148 * @return completable future representing the pending results
149 */
150 public CompletableFuture<Void> submit(Runnable command, int hint) {
151 int index = Math.abs(hint) % backends.size();
152 return CompletableFuture.runAsync(command, backends.get(index));
153 }
154
155 /**
156 * Submits a value-returning task for execution and returns a
157 * Future representing the pending results of the task. The
158 * Future's {@code get} method will return the task's result upon
159 * successful completion.
160 *
161 * @param command the {@link Runnable} task
162 * @param hintFunction Function to compute hint value
163 * @return completable future representing the pending results
164 */
165 public CompletableFuture<Void> submit(Runnable command, Function<Runnable, Integer> hintFunction) {
166 int hint = hintFunction.apply(command);
167 return submit(command, hint);
168 }
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700169
170 private static int hint(Runnable command) {
171 if (command instanceof PickyTask) {
172 return ((PickyTask) command).hint();
173 } else {
174 return Objects.hashCode(command);
175 }
176 }
177
178 @Override
179 public void execute(Runnable command) {
180 execute(command, PredictableExecutor::hint);
181 }
182
183 @Override
184 public void shutdown() {
Sho SHIMIZU8ebb04a2016-10-06 15:58:29 -0700185 backends.forEach(ExecutorService::shutdown);
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700186 }
187
188 @Override
189 public List<Runnable> shutdownNow() {
190 return backends.stream()
191 .map(ExecutorService::shutdownNow)
192 .flatMap(List::stream)
193 .collect(Collectors.toList());
194 }
195
196 @Override
197 public boolean isShutdown() {
198 return backends.stream().allMatch(ExecutorService::isShutdown);
199 }
200
201 @Override
202 public boolean isTerminated() {
203 return backends.stream().allMatch(ExecutorService::isTerminated);
204 }
205
206 /**
207 * {@inheritDoc}
208 * <p>
209 * Note: It'll try, but is not assured that the method will return by specified timeout.
210 */
211 @Override
212 public boolean awaitTermination(long timeout, TimeUnit unit)
213 throws InterruptedException {
214
215 final Duration timeoutD = Duration.of(unit.toMillis(timeout), ChronoUnit.MILLIS);
216 final Instant start = Instant.now();
217
218 return backends.parallelStream()
219 .filter(es -> !es.isTerminated())
220 .map(es -> {
221 long timeoutMs = timeoutD.minus(Duration.between(Instant.now(), start)).toMillis();
222 try {
223 return es.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
224 } catch (InterruptedException e) {
225 Thread.currentThread().interrupt();
226 return false;
227 }
228 })
229 .allMatch(result -> result);
230 }
231
232 @Override
233 protected <T> PickyFutureTask<T> newTaskFor(Callable<T> callable) {
234 return new PickyFutureTask<>(callable);
235 }
236
237 @Override
238 protected <T> PickyFutureTask<T> newTaskFor(Runnable runnable, T value) {
239 return new PickyFutureTask<>(runnable, value);
240 }
241
242 /**
243 * {@link Runnable} also implementing {@link PickyTask}.
244 */
245 public static interface PickyRunnable extends PickyTask, Runnable { }
246
247 /**
248 * {@link Callable} also implementing {@link PickyTask}.
249 *
250 * @param <T> result type
251 */
252 public static interface PickyCallable<T> extends PickyTask, Callable<T> { }
253
254 /**
255 * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
256 *
257 * @param runnable {@link Runnable}
258 * @param hint hint value
259 * @return {@link PickyRunnable}
260 */
261 public static PickyRunnable picky(Runnable runnable, int hint) {
262 return picky(runnable, (r) -> hint);
263 }
264
265 /**
266 * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
267 *
268 * @param runnable {@link Runnable}
269 * @param hint hint function
270 * @return {@link PickyRunnable}
271 */
272 public static PickyRunnable picky(Runnable runnable, Function<Runnable, Integer> hint) {
273 checkNotNull(runnable);
274 checkNotNull(hint);
275 return new PickyRunnable() {
276
277 @Override
278 public void run() {
279 runnable.run();
280 }
281
282 @Override
283 public int hint() {
284 return hint.apply(runnable);
285 }
286 };
287 }
288
289 /**
290 * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
291 *
292 * @param callable {@link Callable}
293 * @param hint hint value
Ray Milkeyef794342016-11-09 16:20:29 -0800294 * @param <T> entity type
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700295 * @return {@link PickyCallable}
296 */
297 public static <T> PickyCallable<T> picky(Callable<T> callable, int hint) {
298 return picky(callable, (c) -> hint);
299 }
300
301 /**
302 * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
303 *
304 * @param callable {@link Callable}
305 * @param hint hint function
Ray Milkeyef794342016-11-09 16:20:29 -0800306 * @param <T> entity type
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700307 * @return {@link PickyCallable}
308 */
309 public static <T> PickyCallable<T> picky(Callable<T> callable, Function<Callable<T>, Integer> hint) {
310 checkNotNull(callable);
311 checkNotNull(hint);
312 return new PickyCallable<T>() {
313
314 @Override
315 public T call() throws Exception {
316 return callable.call();
317 }
318
319 @Override
320 public int hint() {
321 return hint.apply(callable);
322 }
323
324 };
325 }
326
327 /**
328 * Abstraction to give a task a way to express it's preference to run on
329 * certain thread.
330 */
331 public static interface PickyTask {
332
333 /**
334 * Returns hint for choosing which Thread to run this task on.
335 *
336 * @return hint value
337 */
338 int hint();
339 }
340
341 /**
342 * A {@link FutureTask} implementing {@link PickyTask}.
343 * <p>
344 * Note: if the wrapped {@link Callable} or {@link Runnable} was an instance of
345 * {@link PickyTask}, it will use {@link PickyTask#hint()} value, if not use {@link Object#hashCode()}.
346 *
347 * @param <T> result type.
348 */
349 public static class PickyFutureTask<T>
350 extends FutureTask<T>
351 implements PickyTask {
352
353 private final Object runnableOrCallable;
354
355 /**
356 * Same as {@link FutureTask#FutureTask(Runnable, Object)}.
Ray Milkeyef794342016-11-09 16:20:29 -0800357 *
358 * @param runnable work to do
359 * @param value result
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700360 */
361 public PickyFutureTask(Runnable runnable, T value) {
362 super(runnable, value);
363 runnableOrCallable = checkNotNull(runnable);
364 }
365
366 /**
367 * Same as {@link FutureTask#FutureTask(Callable)}.
Ray Milkeyef794342016-11-09 16:20:29 -0800368 *
369 * @param callable work to be done
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700370 */
371 public PickyFutureTask(Callable<T> callable) {
372 super(callable);
373 runnableOrCallable = checkNotNull(callable);
374 }
375
376 @Override
377 public int hint() {
378 if (runnableOrCallable instanceof PickyTask) {
379 return ((PickyTask) runnableOrCallable).hint();
380 } else {
381 return runnableOrCallable.hashCode();
382 }
383 }
384 }
385}