blob: 936a33ff2a6c315cff2279b9a366c3f96591023e [file] [log] [blame]
Jordan Halterman544d1d52017-04-19 23:45:12 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import java.util.LinkedList;
19import java.util.concurrent.Executor;
20
21/**
22 * Executor that executes tasks in serial on a shared thread pool, falling back to parallel execution when threads
23 * are blocked.
24 * <p>
25 * This executor attempts to execute tasks in serial as if they occur on a single thread. However, in the event tasks
26 * are blocking a thread (a thread is in the {@link Thread.State#WAITING} or {@link Thread.State#TIMED_WAITING} state)
27 * the executor will execute tasks on parallel on the underlying {@link Executor}. This is useful for ensuring blocked
28 * threads cannot block events, but mimics a single-threaded model otherwise.
29 */
30public class BestEffortSerialExecutor implements Executor {
31 private final Executor parent;
32 private final LinkedList<Runnable> tasks = new LinkedList<>();
33 private volatile Thread thread;
34
35 public BestEffortSerialExecutor(Executor parent) {
36 this.parent = parent;
37 }
38
39 private void run() {
40 synchronized (tasks) {
41 thread = Thread.currentThread();
42 }
43 for (;;) {
44 if (!runTask()) {
45 synchronized (tasks) {
46 thread = null;
47 }
48 return;
49 }
50 }
51 }
52
53 private boolean runTask() {
54 final Runnable task;
55 synchronized (tasks) {
56 task = tasks.poll();
57 if (task == null) {
58 return false;
59 }
60 }
61 task.run();
62 return true;
63 }
64
65 @Override
66 public void execute(Runnable command) {
67 synchronized (tasks) {
68 tasks.add(command);
69 if (thread == null) {
70 parent.execute(this::run);
71 } else if (thread.getState() == Thread.State.WAITING || thread.getState() == Thread.State.TIMED_WAITING) {
72 parent.execute(this::runTask);
73 }
74 }
75 }
76}