blob: 530feb07f4948bcdbdcc9178e10f795a07534f1d [file] [log] [blame]
package aQute.libg.forker;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
/**
* A Forker is good in parallel scheduling tasks with dependencies. You can add
* tasks with {@link #doWhen(Collection, Object, Runnable)}. The collection is
* the list of dependencies, the object is the target, and the runnable is run
* to update the target. The runnable will only run when all its dependencies
* have ran their associated runnable.
*
* @author aqute
*
* @param <T>
*/
public class Forker<T> {
final Executor executor;
final Set<T> done = new HashSet<T>();
final List<Job> waiting = new ArrayList<Job>();
final Semaphore semaphore = new Semaphore(0);
final AtomicInteger outstanding = new AtomicInteger();
final AtomicBoolean canceled = new AtomicBoolean();
/**
* Helper class to model a Job
*/
class Job implements Runnable {
T target;
Set<T> dependencies;
Runnable runnable;
Throwable exception;
volatile Thread t;
volatile AtomicBoolean canceled = new AtomicBoolean(false);
/**
* Run when the job's dependencies are done.
*/
public void run() {
Thread.interrupted(); // clear the interrupt flag
try {
synchronized (this) {
// Check if we got canceled
if (canceled.get())
return;
t = Thread.currentThread();
}
runnable.run();
} catch (Exception e) {
exception = e;
} finally {
synchronized (this) {
t = null;
}
Thread.interrupted(); // clear the interrupt flag
done(target);
}
}
/**
* Cancel this job
*/
private void cancel() {
if (!canceled.getAndSet(true)) {
synchronized (this) {
if (t != null)
t.interrupt();
}
}
}
}
/**
* Constructor
*
* @param executor
*/
public Forker(Executor executor) {
this.executor = executor;
}
/**
* Constructor
*
*/
public Forker() {
this.executor = Executors.newFixedThreadPool(4);
}
/**
* Schedule a job for execution when the dependencies are done of target are
* done.
*
* @param dependencies the dependencies that must have run
* @param target the target, is removed from all the dependencies when it ran
* @param runnable the runnable to run
*/
public synchronized void doWhen(Collection<? extends T> dependencies, T target,
Runnable runnable) {
System.out.println("doWhen " + dependencies);
outstanding.incrementAndGet();
Job job = new Job();
job.dependencies = new HashSet<T>(dependencies);
job.dependencies.removeAll(done);
job.target = target;
job.runnable = runnable;
if (job.dependencies.isEmpty()) {
executor.execute(job);
} else {
waiting.add(job);
}
}
/**
* Called when the target has ran by the Job.
*
* @param done
*/
private void done(T done) {
List<Runnable> torun = new ArrayList<Runnable>();
synchronized (this) {
System.out.println("done " + done);
semaphore.release();
for (Iterator<Job> e = waiting.iterator(); e.hasNext();) {
Job job = e.next();
if (job.dependencies.remove(done) && job.dependencies.isEmpty()) {
System.out.println("scheduling " + job.target);
torun.add(job);
e.remove();
}
}
}
for (Runnable r : torun)
executor.execute(r);
}
/**
* Wait until all jobs have run.
*
* @throws InterruptedException
*/
public void join() throws InterruptedException {
System.out.println("join " + outstanding + " " + semaphore);
check();
semaphore.acquire(outstanding.getAndSet(0));
}
/**
* Check that we have no jobs that can never be satisfied. I.e. if
* the dependencies contain a target that is not listed.
*/
private void check() {
// TODO
}
/**
* Return the number of outstanding jobs
* @return outstanding jobs
*/
public int getOutstanding() {
return semaphore.availablePermits();
}
/**
* Cancel the forker.
*
* @throws InterruptedException
*/
public void cancel() throws InterruptedException {
System.out.println("canceled " + outstanding + " " + semaphore);
if (!canceled.getAndSet(true)) {
for (Job job : waiting) {
job.cancel();
}
}
join();
}
}