blob: 530feb07f4948bcdbdcc9178e10f795a07534f1d [file] [log] [blame]
Stuart McCulloch26e7a5a2011-10-17 10:31:43 +00001package aQute.libg.forker;
2
3import java.util.*;
4import java.util.concurrent.*;
5import java.util.concurrent.atomic.*;
6
7/**
8 * A Forker is good in parallel scheduling tasks with dependencies. You can add
9 * tasks with {@link #doWhen(Collection, Object, Runnable)}. The collection is
10 * the list of dependencies, the object is the target, and the runnable is run
11 * to update the target. The runnable will only run when all its dependencies
12 * have ran their associated runnable.
13 *
14 * @author aqute
15 *
16 * @param <T>
17 */
18public class Forker<T> {
19 final Executor executor;
20 final Set<T> done = new HashSet<T>();
21 final List<Job> waiting = new ArrayList<Job>();
22 final Semaphore semaphore = new Semaphore(0);
23 final AtomicInteger outstanding = new AtomicInteger();
24 final AtomicBoolean canceled = new AtomicBoolean();
25
26 /**
27 * Helper class to model a Job
28 */
29 class Job implements Runnable {
30 T target;
31 Set<T> dependencies;
32 Runnable runnable;
33 Throwable exception;
34 volatile Thread t;
35 volatile AtomicBoolean canceled = new AtomicBoolean(false);
36
37 /**
38 * Run when the job's dependencies are done.
39 */
40 public void run() {
41 Thread.interrupted(); // clear the interrupt flag
42
43 try {
44 synchronized (this) {
45 // Check if we got canceled
46 if (canceled.get())
47 return;
48
49 t = Thread.currentThread();
50 }
51 runnable.run();
52 } catch (Exception e) {
53 exception = e;
54 } finally {
55 synchronized (this) {
56 t = null;
57 }
58 Thread.interrupted(); // clear the interrupt flag
59 done(target);
60 }
61 }
62
63 /**
64 * Cancel this job
65 */
66 private void cancel() {
67 if (!canceled.getAndSet(true)) {
68 synchronized (this) {
69 if (t != null)
70 t.interrupt();
71 }
72 }
73 }
74 }
75
76 /**
77 * Constructor
78 *
79 * @param executor
80 */
81 public Forker(Executor executor) {
82 this.executor = executor;
83 }
84
85 /**
86 * Constructor
87 *
88 */
89 public Forker() {
90 this.executor = Executors.newFixedThreadPool(4);
91 }
92
93 /**
94 * Schedule a job for execution when the dependencies are done of target are
95 * done.
96 *
97 * @param dependencies the dependencies that must have run
98 * @param target the target, is removed from all the dependencies when it ran
99 * @param runnable the runnable to run
100 */
101 public synchronized void doWhen(Collection<? extends T> dependencies, T target,
102 Runnable runnable) {
103 System.out.println("doWhen " + dependencies);
104 outstanding.incrementAndGet();
105 Job job = new Job();
106 job.dependencies = new HashSet<T>(dependencies);
107 job.dependencies.removeAll(done);
108 job.target = target;
109
110 job.runnable = runnable;
111 if (job.dependencies.isEmpty()) {
112 executor.execute(job);
113 } else {
114 waiting.add(job);
115 }
116 }
117
118 /**
119 * Called when the target has ran by the Job.
120 *
121 * @param done
122 */
123 private void done(T done) {
124 List<Runnable> torun = new ArrayList<Runnable>();
125 synchronized (this) {
126 System.out.println("done " + done);
127 semaphore.release();
128
129 for (Iterator<Job> e = waiting.iterator(); e.hasNext();) {
130 Job job = e.next();
131 if (job.dependencies.remove(done) && job.dependencies.isEmpty()) {
132 System.out.println("scheduling " + job.target);
133 torun.add(job);
134 e.remove();
135 }
136 }
137 }
138 for (Runnable r : torun)
139 executor.execute(r);
140 }
141
142 /**
143 * Wait until all jobs have run.
144 *
145 * @throws InterruptedException
146 */
147 public void join() throws InterruptedException {
148 System.out.println("join " + outstanding + " " + semaphore);
149 check();
150 semaphore.acquire(outstanding.getAndSet(0));
151 }
152
153 /**
154 * Check that we have no jobs that can never be satisfied. I.e. if
155 * the dependencies contain a target that is not listed.
156 */
157 private void check() {
158 // TODO
159 }
160
161 /**
162 * Return the number of outstanding jobs
163 * @return outstanding jobs
164 */
165 public int getOutstanding() {
166 return semaphore.availablePermits();
167 }
168
169 /**
170 * Cancel the forker.
171 *
172 * @throws InterruptedException
173 */
174 public void cancel() throws InterruptedException {
175 System.out.println("canceled " + outstanding + " " + semaphore);
176
177 if (!canceled.getAndSet(true)) {
178 for (Job job : waiting) {
179 job.cancel();
180 }
181 }
182 join();
183 }
184}