blob: 279091ca8267e52e41d85997401550354cf22245 [file] [log] [blame]
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
pierff0a45a2019-05-17 20:47:06 +020018import com.google.common.util.concurrent.MoreExecutors;
19
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -070020import static com.google.common.base.Preconditions.checkArgument;
21import static com.google.common.base.Preconditions.checkNotNull;
22
23import java.time.Duration;
24import java.time.Instant;
25import java.time.temporal.ChronoUnit;
26import java.util.ArrayList;
27import java.util.List;
28import java.util.Objects;
29import java.util.concurrent.AbstractExecutorService;
30import java.util.concurrent.Callable;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.FutureTask;
34import java.util.concurrent.ThreadFactory;
35import java.util.concurrent.TimeUnit;
36import java.util.function.Function;
37import java.util.stream.Collectors;
38
39/**
40 * (Somewhat) predictable ExecutorService.
41 * <p>
42 * ExecutorService which behaves similar to the one created by
43 * {@link Executors#newFixedThreadPool(int, ThreadFactory)},
44 * but assigns command to specific thread based on
45 * it's {@link PickyTask#hint()}, {@link Object#hashCode()}, or hint value explicitly
46 * specified when the command was passed to this {@link PredictableExecutor}.
47 */
48public class PredictableExecutor
49 extends AbstractExecutorService
50 implements ExecutorService {
51
52 private final List<ExecutorService> backends;
53
54 /**
55 * Creates {@link PredictableExecutor} instance.
56 *
57 * @param buckets number of buckets or 0 to match available processors
58 * @param threadFactory {@link ThreadFactory} to use to create threads
59 * @return {@link PredictableExecutor}
60 */
61 public static PredictableExecutor newPredictableExecutor(int buckets, ThreadFactory threadFactory) {
62 return new PredictableExecutor(buckets, threadFactory);
63 }
64
65 /**
66 * Creates {@link PredictableExecutor} instance.
67 *
68 * @param buckets number of buckets or 0 to match available processors
69 * @param threadFactory {@link ThreadFactory} to use to create threads
70 */
71 public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
pierff0a45a2019-05-17 20:47:06 +020072 this(buckets, threadFactory, false);
73 }
74
75 /**
76 * Creates {@link PredictableExecutor} instance.
77 * Meant for testing purposes.
78 *
79 * @param buckets number of buckets or 0 to match available processors
80 * @param threadFactory {@link ThreadFactory} to use to create threads
81 * @param directExec direct executors
82 */
83 public PredictableExecutor(int buckets, ThreadFactory threadFactory, boolean directExec) {
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -070084 checkArgument(buckets >= 0, "number of buckets must be non zero");
85 checkNotNull(threadFactory);
86 if (buckets == 0) {
87 buckets = Runtime.getRuntime().availableProcessors();
88 }
89 this.backends = new ArrayList<>(buckets);
90
91 for (int i = 0; i < buckets; ++i) {
pierff0a45a2019-05-17 20:47:06 +020092 this.backends.add(backendExecutorService(threadFactory, directExec));
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -070093 }
94 }
95
96 /**
97 * Creates {@link PredictableExecutor} instance with
98 * bucket size set to number of available processors.
99 *
100 * @param threadFactory {@link ThreadFactory} to use to create threads
101 */
102 public PredictableExecutor(ThreadFactory threadFactory) {
103 this(0, threadFactory);
104 }
105
106 /**
107 * Creates a single thread {@link ExecutorService} to use in the backend.
108 *
109 * @param threadFactory {@link ThreadFactory} to use to create threads
pierff0a45a2019-05-17 20:47:06 +0200110 * @param direct direct executors
111 * @return single thread {@link ExecutorService} or direct executor
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700112 */
pierff0a45a2019-05-17 20:47:06 +0200113 protected ExecutorService backendExecutorService(ThreadFactory threadFactory, boolean direct) {
114 return direct ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor(threadFactory);
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700115 }
116
117
118 /**
119 * Executes given command at some time in the future.
120 *
121 * @param command the {@link Runnable} task
122 * @param hint value to pick thread to run on.
123 */
124 public void execute(Runnable command, int hint) {
125 int index = Math.abs(hint) % backends.size();
126 backends.get(index).execute(command);
127 }
128
129 /**
130 * Executes given command at some time in the future.
131 *
132 * @param command the {@link Runnable} task
133 * @param hintFunction Function to compute hint value
134 */
135 public void execute(Runnable command, Function<Runnable, Integer> hintFunction) {
136 execute(command, hintFunction.apply(command));
137 }
138
139
140 private static int hint(Runnable command) {
141 if (command instanceof PickyTask) {
142 return ((PickyTask) command).hint();
143 } else {
144 return Objects.hashCode(command);
145 }
146 }
147
148 @Override
149 public void execute(Runnable command) {
150 execute(command, PredictableExecutor::hint);
151 }
152
153 @Override
154 public void shutdown() {
Sho SHIMIZU8ebb04a2016-10-06 15:58:29 -0700155 backends.forEach(ExecutorService::shutdown);
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700156 }
157
158 @Override
159 public List<Runnable> shutdownNow() {
160 return backends.stream()
161 .map(ExecutorService::shutdownNow)
162 .flatMap(List::stream)
163 .collect(Collectors.toList());
164 }
165
166 @Override
167 public boolean isShutdown() {
168 return backends.stream().allMatch(ExecutorService::isShutdown);
169 }
170
171 @Override
172 public boolean isTerminated() {
173 return backends.stream().allMatch(ExecutorService::isTerminated);
174 }
175
176 /**
177 * {@inheritDoc}
178 * <p>
179 * Note: It'll try, but is not assured that the method will return by specified timeout.
180 */
181 @Override
182 public boolean awaitTermination(long timeout, TimeUnit unit)
183 throws InterruptedException {
184
185 final Duration timeoutD = Duration.of(unit.toMillis(timeout), ChronoUnit.MILLIS);
186 final Instant start = Instant.now();
187
188 return backends.parallelStream()
189 .filter(es -> !es.isTerminated())
190 .map(es -> {
191 long timeoutMs = timeoutD.minus(Duration.between(Instant.now(), start)).toMillis();
192 try {
193 return es.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
194 } catch (InterruptedException e) {
195 Thread.currentThread().interrupt();
196 return false;
197 }
198 })
199 .allMatch(result -> result);
200 }
201
202 @Override
203 protected <T> PickyFutureTask<T> newTaskFor(Callable<T> callable) {
204 return new PickyFutureTask<>(callable);
205 }
206
207 @Override
208 protected <T> PickyFutureTask<T> newTaskFor(Runnable runnable, T value) {
209 return new PickyFutureTask<>(runnable, value);
210 }
211
212 /**
213 * {@link Runnable} also implementing {@link PickyTask}.
214 */
215 public static interface PickyRunnable extends PickyTask, Runnable { }
216
217 /**
218 * {@link Callable} also implementing {@link PickyTask}.
219 *
220 * @param <T> result type
221 */
222 public static interface PickyCallable<T> extends PickyTask, Callable<T> { }
223
224 /**
225 * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
226 *
227 * @param runnable {@link Runnable}
228 * @param hint hint value
229 * @return {@link PickyRunnable}
230 */
231 public static PickyRunnable picky(Runnable runnable, int hint) {
232 return picky(runnable, (r) -> hint);
233 }
234
235 /**
236 * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
237 *
238 * @param runnable {@link Runnable}
239 * @param hint hint function
240 * @return {@link PickyRunnable}
241 */
242 public static PickyRunnable picky(Runnable runnable, Function<Runnable, Integer> hint) {
243 checkNotNull(runnable);
244 checkNotNull(hint);
245 return new PickyRunnable() {
246
247 @Override
248 public void run() {
249 runnable.run();
250 }
251
252 @Override
253 public int hint() {
254 return hint.apply(runnable);
255 }
256 };
257 }
258
259 /**
260 * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
261 *
262 * @param callable {@link Callable}
263 * @param hint hint value
Ray Milkeyef794342016-11-09 16:20:29 -0800264 * @param <T> entity type
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700265 * @return {@link PickyCallable}
266 */
267 public static <T> PickyCallable<T> picky(Callable<T> callable, int hint) {
268 return picky(callable, (c) -> hint);
269 }
270
271 /**
272 * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
273 *
274 * @param callable {@link Callable}
275 * @param hint hint function
Ray Milkeyef794342016-11-09 16:20:29 -0800276 * @param <T> entity type
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700277 * @return {@link PickyCallable}
278 */
279 public static <T> PickyCallable<T> picky(Callable<T> callable, Function<Callable<T>, Integer> hint) {
280 checkNotNull(callable);
281 checkNotNull(hint);
282 return new PickyCallable<T>() {
283
284 @Override
285 public T call() throws Exception {
286 return callable.call();
287 }
288
289 @Override
290 public int hint() {
291 return hint.apply(callable);
292 }
293
294 };
295 }
296
297 /**
298 * Abstraction to give a task a way to express it's preference to run on
299 * certain thread.
300 */
301 public static interface PickyTask {
302
303 /**
304 * Returns hint for choosing which Thread to run this task on.
305 *
306 * @return hint value
307 */
308 int hint();
309 }
310
311 /**
312 * A {@link FutureTask} implementing {@link PickyTask}.
313 * <p>
314 * Note: if the wrapped {@link Callable} or {@link Runnable} was an instance of
315 * {@link PickyTask}, it will use {@link PickyTask#hint()} value, if not use {@link Object#hashCode()}.
316 *
317 * @param <T> result type.
318 */
319 public static class PickyFutureTask<T>
320 extends FutureTask<T>
321 implements PickyTask {
322
323 private final Object runnableOrCallable;
324
325 /**
326 * Same as {@link FutureTask#FutureTask(Runnable, Object)}.
Ray Milkeyef794342016-11-09 16:20:29 -0800327 *
328 * @param runnable work to do
329 * @param value result
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700330 */
331 public PickyFutureTask(Runnable runnable, T value) {
332 super(runnable, value);
333 runnableOrCallable = checkNotNull(runnable);
334 }
335
336 /**
337 * Same as {@link FutureTask#FutureTask(Callable)}.
Ray Milkeyef794342016-11-09 16:20:29 -0800338 *
339 * @param callable work to be done
Yuta HIGUCHIe8f48d82016-08-24 20:27:48 -0700340 */
341 public PickyFutureTask(Callable<T> callable) {
342 super(callable);
343 runnableOrCallable = checkNotNull(callable);
344 }
345
346 @Override
347 public int hint() {
348 if (runnableOrCallable instanceof PickyTask) {
349 return ((PickyTask) runnableOrCallable).hint();
350 } else {
351 return runnableOrCallable.hashCode();
352 }
353 }
354 }
355}