Add job control

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1736004 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java
index 7153dbf..91dcf17 100644
--- a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java
+++ b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java
@@ -34,6 +34,7 @@
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -43,6 +44,7 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.felix.gogo.runtime.Job;
 import org.apache.felix.service.command.CommandSession;
 import org.apache.felix.service.command.Converter;
 import org.jline.builtins.Options;
@@ -52,7 +54,7 @@
  */
 public class Builtin {
 
-    static final String[] functions = {"format", "getopt", "new", "set", "tac", "type"};
+    static final String[] functions = {"format", "getopt", "new", "set", "tac", "type", "jobs", "fg", "bg"};
 
     private static final String[] packages = {"java.lang", "java.io", "java.net",
             "java.util"};
@@ -439,6 +441,63 @@
         return false;
     }
 
+    public void jobs(CommandSession session, String[] argv) {
+        List<Job> jobs = session.jobs();
+        Job current = session.currentJob();
+        for (Job job : jobs) {
+            if (job != current) {
+                System.out.println("[" + job.id() + "] " + job.status().toString().toLowerCase()
+                        + " " + job.command());
+            }
+        }
+    }
+
+    public void fg(CommandSession session, String[] argv) {
+        List<Job> jobs = session.jobs();
+        Collections.reverse(jobs);
+        Job current = session.currentJob();
+        if (argv.length == 0) {
+            Job job = jobs.stream().filter(j -> j != current)
+                    .findFirst().orElse(null);
+            if (job != null) {
+                job.foreground();
+            } else {
+                System.err.println("fg: no current job");
+            }
+        } else {
+            Job job = jobs.stream().filter(j -> j != current && argv[0].equals(Integer.toString(j.id())))
+                    .findFirst().orElse(null);
+            if (job != null) {
+                job.foreground();
+            } else {
+                System.err.println("fg: job not found: " + argv[0]);
+            }
+        }
+    }
+
+    public void bg(CommandSession session, String[] argv) {
+        List<Job> jobs = session.jobs();
+        Collections.reverse(jobs);
+        Job current = session.currentJob();
+        if (argv.length == 0) {
+            Job job = jobs.stream().filter(j -> j != current)
+                    .findFirst().orElse(null);
+            if (job != null) {
+                job.background();
+            } else {
+                System.err.println("fg: no current job");
+            }
+        } else {
+            Job job = jobs.stream().filter(j -> j != current && argv[0].equals(Integer.toString(j.id())))
+                    .findFirst().orElse(null);
+            if (job != null) {
+                job.background();
+            } else {
+                System.err.println("fg: job not found: " + argv[0]);
+            }
+        }
+    }
+
     private boolean isClosure(Object target) {
         return target.getClass().getSimpleName().equals("Closure");
     }
diff --git a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java
index d5a9a3c..740242d 100644
--- a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java
+++ b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java
@@ -37,11 +37,14 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.felix.gogo.runtime.Closure;
 import org.apache.felix.gogo.runtime.CommandProxy;
 import org.apache.felix.gogo.runtime.CommandSessionImpl;
 import org.apache.felix.gogo.runtime.Expander;
+import org.apache.felix.gogo.runtime.Job;
+import org.apache.felix.gogo.runtime.Job.Status;
 import org.apache.felix.gogo.runtime.Reflective;
 import org.apache.felix.service.command.CommandProcessor;
 import org.apache.felix.service.command.CommandSession;
@@ -56,8 +59,11 @@
 import org.jline.reader.LineReaderBuilder;
 import org.jline.reader.ParsedLine;
 import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.LineReaderImpl;
 import org.jline.reader.impl.history.history.FileHistory;
 import org.jline.terminal.Terminal;
+import org.jline.terminal.Terminal.Signal;
+import org.jline.terminal.Terminal.SignalHandler;
 
 public class Shell {
 
@@ -253,7 +259,7 @@
         newSession.put("#LINES", (Function) (s, arguments) -> terminal.getHeight());
         newSession.put("#PWD", (Function) (s, arguments) -> s.currentDir().toString());
 
-        LineReader reader = null;
+        LineReader reader;
         if (args.isEmpty() && interactive) {
             reader = LineReaderBuilder.builder()
                     .terminal(terminal)
@@ -265,6 +271,8 @@
                     .build();
             newSession.put(Shell.VAR_READER, reader);
             newSession.put(Shell.VAR_COMPLETIONS, new HashMap());
+        } else {
+            reader = null;
         }
 
         if (login || interactive) {
@@ -285,34 +293,94 @@
 
         if (args.isEmpty()) {
             if (interactive) {
-                while (true) {
-                    try {
-                        reader.readLine(Shell.getPrompt(session), Shell.getRPrompt(session), null, null);
-                        ParsedLine parsedLine = reader.getParsedLine();
-                        if (parsedLine == null) {
-                            throw new EndOfFileException();
+                AtomicBoolean reading = new AtomicBoolean();
+                newSession.setJobListener((job, previous, current) -> {
+                    if (previous == Status.Background || current == Status.Background
+                            || previous == Status.Suspended || current == Status.Suspended) {
+                        int w = terminal.getWidth();
+                        String status = current.name().toLowerCase();
+                        StringBuilder sb = new StringBuilder();
+                        for (int i = 0; i < w - 1; i++) {
+                            sb.append(' ');
                         }
-                        try {
-                            result = session.execute(((ParsedLineImpl) parsedLine).program());
-                            session.put(Shell.VAR_RESULT, result); // set $_ to last result
-
-                            if (result != null && !Boolean.FALSE.equals(session.get(".Gogo.format"))) {
-                                System.out.println(session.format(result, Converter.INSPECT));
-                            }
-                        } catch (Exception e) {
-                            session.put(Shell.VAR_EXCEPTION, e);
+                        sb.append('\r');
+                        sb.append("[").append(job.id()).append("]  ");
+                        sb.append(status);
+                        for (int i = status.length(); i < "background".length(); i++) {
+                            sb.append(' ');
                         }
-
-                    } catch (UserInterruptException e) {
-                        // continue;
-                    } catch (EndOfFileException e) {
-                        try {
-                            reader.getHistory().flush();
-                        } catch (IOException e1) {
-                            e.addSuppressed(e1);
+                        sb.append("  ").append(job.command()).append("\n");
+                        terminal.writer().write(sb.toString());
+                        terminal.flush();
+                        if (reading.get()) {
+                            ((LineReaderImpl) reader).redrawLine();
+                            ((LineReaderImpl) reader).redisplay();
                         }
-                        break;
                     }
+                });
+                SignalHandler intHandler = terminal.handle(Signal.INT, s -> {
+                    Job current = newSession.foregroundJob();
+                    if (current != null) {
+                        current.interrupt();
+                    }
+                });
+                SignalHandler suspHandler = terminal.handle(Signal.TSTP, s -> {
+                    Job current = newSession.foregroundJob();
+                    if (current != null) {
+                        current.suspend();
+                    }
+                });
+                try {
+                    while (true) {
+                        try {
+                            reading.set(true);
+                            try {
+                                reader.readLine(Shell.getPrompt(session), Shell.getRPrompt(session), null, null);
+                            } finally {
+                                reading.set(false);
+                            }
+                            ParsedLine parsedLine = reader.getParsedLine();
+                            if (parsedLine == null) {
+                                throw new EndOfFileException();
+                            }
+                            try {
+                                result = session.execute(((ParsedLineImpl) parsedLine).program());
+                                session.put(Shell.VAR_RESULT, result); // set $_ to last result
+
+                                if (result != null && !Boolean.FALSE.equals(session.get(".Gogo.format"))) {
+                                    System.out.println(session.format(result, Converter.INSPECT));
+                                }
+                            } catch (Exception e) {
+                                session.put(Shell.VAR_EXCEPTION, e);
+                            }
+
+                            while (true) {
+                                Job job = session.foregroundJob();
+                                if (job != null) {
+                                    synchronized (job) {
+                                        if (job.status() == Status.Foreground) {
+                                            job.wait();
+                                        }
+                                    }
+                                } else {
+                                    break;
+                                }
+                            }
+
+                        } catch (UserInterruptException e) {
+                            // continue;
+                        } catch (EndOfFileException e) {
+                            try {
+                                reader.getHistory().flush();
+                            } catch (IOException e1) {
+                                e.addSuppressed(e1);
+                            }
+                            break;
+                        }
+                    }
+                } finally {
+                    terminal.handle(Signal.INT, intHandler);
+                    terminal.handle(Signal.TSTP, suspHandler);
                 }
             }
         } else {
@@ -341,7 +409,7 @@
                 program = readScript(script);
             }
 
-            result = newSession.execute(program);
+                result = newSession.execute(program);
         }
 
         if (login && interactive && !opt.isSet("noshutdown")) {
diff --git a/gogo/runtime/pom.xml b/gogo/runtime/pom.xml
index 90b83b2..1a407d4 100644
--- a/gogo/runtime/pom.xml
+++ b/gogo/runtime/pom.xml
@@ -29,6 +29,8 @@
     <artifactId>org.apache.felix.gogo.runtime</artifactId>
     <version>0.16.3-SNAPSHOT</version>
     <properties>
+        <!-- Skip because of NPE -->
+        <animal.sniffer.skip>true</animal.sniffer.skip>
         <felix.java.version>8</felix.java.version>
     </properties>
     <dependencies>
@@ -55,6 +57,7 @@
             <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
+                <version>3.0.0</version>
                 <extensions>true</extensions>
                 <configuration>
                     <instructions>
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/api/JobListener.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/api/JobListener.java
new file mode 100644
index 0000000..179ebd5
--- /dev/null
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/api/JobListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.gogo.api;
+
+import org.apache.felix.gogo.runtime.Job;
+import org.apache.felix.gogo.runtime.Job.Status;
+import org.apache.felix.service.command.CommandSession;
+
+/**
+ * Listener for command executions.
+ *
+ * Such listeners must be registered in the OSGi registry and will be called
+ * by the CommandProcessor when a command line is executed in a given session.
+ */
+public interface JobListener {
+
+    void jobChanged(Job job, Status previous, Status current);
+
+}
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
index 2a63c11..2b121cc 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
@@ -31,9 +31,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
+import org.apache.felix.gogo.runtime.Job.Status;
 import org.apache.felix.gogo.runtime.Parser.Array;
 import org.apache.felix.gogo.runtime.Parser.Executable;
 import org.apache.felix.gogo.runtime.Parser.Operator;
@@ -254,42 +253,23 @@
                 pipes.add(new Pipe(this, (Statement) executable, streams, toclose));
             }
 
+            // Create job
+            Token s = pipes.get(0).statement;
+            Token e = pipes.get(pipes.size() - 1).statement;
+            Token t = program.subSequence(s.start - program.start, e.start + e.length - program.start);
+            Job job = session().createJob(t, pipes);
+
             // Start pipe in background
             if (operator != null && Token.eq("&", operator)) {
-
-                for (Pipe pipe : pipes) {
-                    session().getExecutor().submit(pipe);
-                }
-
+                job.start(Status.Background);
                 last = new Result((Object) null);
-
             }
             // Start in foreground and wait for results
             else {
-                List<Future<Result>> results = session().getExecutor().invokeAll(pipes);
-
-                // Get pipe exceptions
-                Exception pipeException = null;
-                for (int i = 0; i < results.size() - 1; i++) {
-                    Future<Result> future = results.get(i);
-                    Throwable e;
-                    try {
-                        Result r = future.get();
-                        e = r.exception;
-                    } catch (ExecutionException ee) {
-                        e = ee.getCause();
-                    }
-                    if (e != null) {
-                        if (pipeException == null) {
-                            pipeException = new Exception("Exception caught during pipe execution");
-                        }
-                        pipeException.addSuppressed(e);
-                    }
-                }
-                session.put(PIPE_EXCEPTION, pipeException);
-
-                last = results.get(results.size() - 1).get();
-                if (last.exception != null) {
+                last = job.start(Status.Foreground);
+                if (last == null) {
+                    last = new Result((Object) null);
+                } else if (last.exception != null) {
                     throw last.exception;
                 }
             }
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java
index 8f07108..3243429 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java
@@ -21,8 +21,6 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -73,20 +71,6 @@
         }
     }
 
-    public CommandSessionImpl createSession(ReadableByteChannel in, WritableByteChannel out, WritableByteChannel err)
-    {
-        synchronized (sessions)
-        {
-            if (stopped)
-            {
-                throw new IllegalStateException("CommandProcessor has been stopped");
-            }
-            CommandSessionImpl session = new CommandSessionImpl(this, in, out, err);
-            sessions.put(session, null);
-            return session;
-        }
-    }
-
     public CommandSessionImpl createSession(InputStream in, OutputStream out, OutputStream err)
     {
         synchronized (sessions)
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
index f914a23..cdffab7 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
@@ -32,6 +32,7 @@
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,12 +40,20 @@
 import java.util.Enumeration;
 import java.util.Formatter;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
+import org.apache.felix.gogo.api.JobListener;
+import org.apache.felix.gogo.runtime.Job.Status;
+import org.apache.felix.gogo.runtime.Pipe.Result;
 import org.apache.felix.service.command.CommandProcessor;
 import org.apache.felix.service.command.CommandSession;
 import org.apache.felix.service.command.Converter;
@@ -70,6 +79,9 @@
     private final CommandProcessorImpl processor;
     protected final ConcurrentMap<String, Object> variables = new ConcurrentHashMap<>();
     private volatile boolean closed;
+    private final List<JobImpl> jobs = new ArrayList<>();
+    private final ThreadLocal<JobImpl> currentJob = new InheritableThreadLocal<>();
+    private JobListener jobListener;
 
     private final ExecutorService executor;
 
@@ -88,19 +100,6 @@
         this.perr = parent.perr;
     }
 
-    protected CommandSessionImpl(CommandProcessorImpl shell, ReadableByteChannel in, WritableByteChannel out, WritableByteChannel err)
-    {
-        this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize();
-        this.executor = Executors.newCachedThreadPool();
-        this.processor = shell;
-        this.channels = new Channel[] { in, out, err };
-        this.in = Channels.newInputStream(in);
-        this.out = Channels.newOutputStream(out);
-        this.err = out == err ? this.out : Channels.newOutputStream(err);
-        this.pout = new PrintStream(this.out, true);
-        this.perr = out == err ? pout : new PrintStream(this.err, true);
-    }
-
     protected CommandSessionImpl(CommandProcessorImpl shell, InputStream in, OutputStream out, OutputStream err)
     {
         this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize();
@@ -150,10 +149,6 @@
         }
     }
 
-    ExecutorService getExecutor() {
-        return executor;
-    }
-
     public Object execute(CharSequence commandline) throws Exception
     {
         assert processor != null;
@@ -370,7 +365,7 @@
         }
         if (target instanceof Dictionary)
         {
-            Map<Object, Object> result = new HashMap<Object, Object>();
+            Map<Object, Object> result = new HashMap<>();
             for (Enumeration e = ((Dictionary) target).keys(); e.hasMoreElements();)
             {
                 Object key = e.nextElement();
@@ -496,4 +491,255 @@
         return processor.expr(this, expr);
     }
 
+    @Override
+    public List<Job> jobs() {
+        synchronized (jobs) {
+            return new ArrayList<>(jobs);
+        }
+    }
+
+    @Override
+    public JobImpl currentJob() {
+        JobImpl job = currentJob.get();
+        while (job != null && job.parent != null) {
+            job = job.parent;
+        }
+        return job;
+    }
+
+    @Override
+    public JobImpl foregroundJob() {
+        List<JobImpl> jobs;
+        synchronized (this.jobs) {
+            jobs = new ArrayList<>(this.jobs);
+        }
+        return jobs.stream()
+                    .filter(j -> j.parent == null && j.status() == Status.Foreground)
+                    .findFirst()
+                    .orElse(null);
+    }
+
+    @Override
+    public void setJobListener(JobListener listener) {
+        synchronized (jobs) {
+            jobListener = listener;
+        }
+    }
+
+    public Job createJob(CharSequence command, List<Pipe> pipes) {
+        synchronized (jobs) {
+            int id = 1;
+            synchronized (jobs) {
+                boolean found;
+                do {
+                    found = false;
+                    for (Job job : jobs) {
+                        if (job.id() == id) {
+                            found = true;
+                            id++;
+                            break;
+                        }
+                    }
+                } while (found);
+            }
+            JobImpl cur = currentJob();
+            JobImpl job = new JobImpl(id, cur, command, pipes);
+            if (cur == null) {
+                jobs.add(job);
+            }
+            return job;
+        }
+    }
+
+    private class JobImpl implements Job {
+        private final int id;
+        private final JobImpl parent;
+        private final CharSequence command;
+        private final List<Pipe> pipes;
+        private Status status = Status.Created;
+        private Future<?> future;
+        private Result result;
+
+        public JobImpl(int id, JobImpl parent, CharSequence command, List<Pipe> pipes) {
+            this.id = id;
+            this.parent = parent;
+            this.command = command;
+            this.pipes = pipes;
+        }
+
+        @Override
+        public int id() {
+            return id;
+        }
+
+        public CharSequence command() {
+            return command;
+        }
+
+        @Override
+        public synchronized Status status() {
+            return status;
+        }
+
+        @Override
+        public synchronized void suspend() {
+            if (status == Status.Done) {
+                throw new IllegalStateException("Job is finished");
+            }
+            if (status != Status.Suspended) {
+                setStatus(Status.Suspended);
+            }
+        }
+
+        @Override
+        public synchronized void background() {
+            if (status == Status.Done) {
+                throw new IllegalStateException("Job is finished");
+            }
+            if (status != Status.Background) {
+                setStatus(Status.Background);
+            }
+        }
+
+        @Override
+        public synchronized void foreground() {
+            if (status == Status.Done) {
+                throw new IllegalStateException("Job is finished");
+            }
+            JobImpl cr = currentJob();
+            JobImpl fg = foregroundJob();
+            if (parent == null && fg != null && fg != this && fg != cr) {
+                throw new IllegalStateException("A job is already in foreground");
+            }
+            if (status != Status.Foreground) {
+                setStatus(Status.Foreground);
+            }
+        }
+
+        @Override
+        public void interrupt() {
+            Future future;
+            synchronized (this) {
+                future = this.future;
+            }
+            if (future != null) {
+                future.cancel(true);
+            }
+        }
+
+        protected synchronized void done() {
+            if (status == Status.Done) {
+                throw new IllegalStateException("Job is finished");
+            }
+            setStatus(Status.Done);
+        }
+
+        private void setStatus(Status newStatus) {
+            setStatus(newStatus, true);
+        }
+
+        private void setStatus(Status newStatus, boolean callListeners) {
+            Status previous;
+            synchronized (this) {
+                previous = this.status;
+                status = newStatus;
+            }
+            if (callListeners) {
+                JobListener listener;
+                synchronized (jobs) {
+                    listener = jobListener;
+                    if (newStatus == Status.Done) {
+                        jobs.remove(this);
+                    }
+                }
+                if (listener != null) {
+                    listener.jobChanged(this, previous, newStatus);
+                }
+            }
+            synchronized (this) {
+                JobImpl.this.notifyAll();
+            }
+        }
+
+        @Override
+        public synchronized Result result() {
+            return result;
+        }
+
+        @Override
+        public synchronized Result start(Status status) throws InterruptedException {
+            if (status == Status.Created || status == Status.Done) {
+                throw new IllegalArgumentException("Illegal start status");
+            }
+            if (this.status != Status.Created) {
+                throw new IllegalStateException("Job already started");
+            }
+            switch (status) {
+                case Suspended:
+                    suspend();
+                    break;
+                case Background:
+                    background();
+                    break;
+                case Foreground:
+                    foreground();
+                    break;
+            }
+            future = executor.submit(this::call);
+            while (this.status == Status.Foreground) {
+                JobImpl.this.wait();
+            }
+            return result;
+        }
+
+        private Void call() throws Exception {
+            Thread thread = Thread.currentThread();
+            String name = thread.getName();
+            try {
+                thread.setName("job controller " + id);
+
+                List<Callable<Result>> wrapped = pipes.stream().map(this::wrap).collect(Collectors.toList());
+                List<Future<Result>> results = executor.invokeAll(wrapped);
+
+                // Get pipe exceptions
+                Exception pipeException = null;
+                for (int i = 0; i < results.size() - 1; i++) {
+                    Future<Result> future = results.get(i);
+                    Throwable e;
+                    try {
+                        Result r = future.get();
+                        e = r.exception;
+                    } catch (ExecutionException ee) {
+                        e = ee.getCause();
+                    }
+                    if (e != null) {
+                        if (pipeException == null) {
+                            pipeException = new Exception("Exception caught during pipe execution");
+                        }
+                        pipeException.addSuppressed(e);
+                    }
+                }
+                put(Closure.PIPE_EXCEPTION, pipeException);
+
+                result = results.get(results.size() - 1).get();
+            } finally {
+                done();
+                thread.setName(name);
+            }
+            return null;
+        }
+
+        private Callable<Result> wrap(Pipe pipe) {
+            return () -> {
+                JobImpl prevJob = currentJob.get();
+                try {
+                    currentJob.set(this);
+                    return pipe.call();
+                } finally {
+                    currentJob.set(prevJob);
+                }
+            };
+        }
+
+    }
 }
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Job.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Job.java
new file mode 100644
index 0000000..721fa8c
--- /dev/null
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Job.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.gogo.runtime;
+
+import org.apache.felix.gogo.runtime.Pipe.Result;
+
+public interface Job {
+
+    enum Status {
+        Created,
+        Suspended,
+        Background,
+        Foreground,
+        Done
+    }
+
+    int id();
+
+    CharSequence command();
+
+    Status status();
+
+    void suspend();
+
+    void background();
+
+    void foreground();
+
+    void interrupt();
+
+    Result result();
+
+    /**
+     * Start the job.
+     * If the job is started in foreground,
+     * waits for the job to finish or to be
+     * suspended or moved to background.
+     *
+     * @param status the desired job status
+     * @return <code>null</code> if the job
+     *   has been suspended or moved to background,
+     *
+     */
+    Result start(Status status) throws InterruptedException;
+
+}
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
index e92b329..fac281f 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
@@ -20,8 +20,10 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
 import java.nio.channels.Channel;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
@@ -38,6 +40,7 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.felix.gogo.runtime.Job.Status;
 import org.apache.felix.gogo.runtime.Parser.Statement;
 import org.apache.felix.gogo.runtime.Pipe.Result;
 import org.apache.felix.service.command.Converter;
@@ -114,6 +117,9 @@
     private static final int WRITE = 2;
 
     private void setStream(Channel ch, int fd, int readWrite, boolean begOfPipe, boolean endOfPipe) throws IOException {
+        if ((readWrite & (READ | WRITE)) == 0) {
+            throw new IllegalArgumentException("Should specify READ and/or WRITE");
+        }
         if ((readWrite & READ) != 0 && !(ch instanceof ReadableByteChannel)) {
             throw new IllegalArgumentException("Channel is not readable");
         }
@@ -135,43 +141,24 @@
             if (streams[fd] != null && (readWrite & READ) != 0 && (readWrite & WRITE) != 0) {
                 throw new IllegalArgumentException("Can not do multios with read/write streams");
             }
-            if ((readWrite & READ) != 0) {
-                MultiReadableByteChannel mrbc;
-                if (streams[fd] instanceof MultiReadableByteChannel) {
-                    mrbc = (MultiReadableByteChannel) streams[fd];
-                } else {
-                    mrbc = new MultiReadableByteChannel();
-                    if (streams[fd] != null && begOfPipe) {
-                        if (toclose[fd]) {
-                            streams[fd].close();
-                        }
-                    } else {
-                        mrbc.addChannel((ReadableByteChannel) streams[fd], toclose[fd]);
-                    }
-                    streams[fd] = mrbc;
-                    toclose[fd] = true;
-                }
-                mrbc.addChannel((ReadableByteChannel) ch, true);
-            } else if ((readWrite & WRITE) != 0) {
-                MultiWritableByteChannel mrbc;
-                if (streams[fd] instanceof MultiWritableByteChannel) {
-                    mrbc = (MultiWritableByteChannel) streams[fd];
-                } else {
-                    mrbc = new MultiWritableByteChannel();
-                    if (streams[fd] != null && endOfPipe) {
-                        if (toclose[fd]) {
-                            streams[fd].close();
-                        }
-                    } else {
-                        mrbc.addChannel((WritableByteChannel) streams[fd], toclose[fd]);
-                    }
-                    streams[fd] = mrbc;
-                    toclose[fd] = true;
-                }
-                mrbc.addChannel((WritableByteChannel) ch, true);
+            MultiChannel mrbc;
+            if (streams[fd] instanceof MultiChannel) {
+                mrbc = (MultiChannel) streams[fd];
             } else {
-                throw new IllegalStateException();
+                mrbc = new MultiChannel();
+                if (streams[fd] != null
+                        && ((begOfPipe && (readWrite & READ) != 0)
+                            || (endOfPipe && (readWrite & WRITE) != 0))) {
+                    if (toclose[fd]) {
+                        streams[fd].close();
+                    }
+                } else {
+                    mrbc.addChannel(streams[fd], toclose[fd]);
+                }
+                streams[fd] = mrbc;
+                toclose[fd] = true;
             }
+            mrbc.addChannel(ch, true);
         }
         else {
             if (streams[fd] != null && toclose[fd]) {
@@ -182,59 +169,6 @@
         }
     }
 
-    private static class MultiChannel<T extends Channel> implements Channel {
-        protected final List<T> channels = new ArrayList<>();
-        protected final List<T> toClose = new ArrayList<>();
-        protected final AtomicBoolean opened = new AtomicBoolean(true);
-        public void addChannel(T channel, boolean toclose) {
-            channels.add(channel);
-            if (toclose) {
-                toClose.add(channel);
-            }
-        }
-
-        public boolean isOpen() {
-            return opened.get();
-        }
-
-        public void close() throws IOException {
-            if (opened.compareAndSet(true, false)) {
-                for (T channel : toClose) {
-                    channel.close();
-                }
-            }
-        }
-    }
-
-    private static class MultiReadableByteChannel extends MultiChannel<ReadableByteChannel> implements ReadableByteChannel {
-        int index = 0;
-        public int read(ByteBuffer dst) throws IOException {
-            int nbRead = -1;
-            while (nbRead < 0 && index < channels.size()) {
-                nbRead = channels.get(index).read(dst);
-                if (nbRead < 0) {
-                    index++;
-                } else {
-                    break;
-                }
-            }
-            return nbRead;
-        }
-    }
-
-    private static class MultiWritableByteChannel extends MultiChannel<WritableByteChannel> implements WritableByteChannel {
-        public int write(ByteBuffer src) throws IOException {
-            int pos = src.position();
-            for (WritableByteChannel ch : channels) {
-                src.position(pos);
-                while (src.hasRemaining()) {
-                    ch.write(src);
-                }
-            }
-            return src.position() - pos;
-        }
-    }
-
     @Override
     public Result call() throws Exception {
         Thread thread = Thread.currentThread();
@@ -247,7 +181,7 @@
         }
     }
 
-    public Result doCall()
+    private Result doCall()
     {
         InputStream in;
         PrintStream out = null;
@@ -332,6 +266,10 @@
                 }
             }
 
+            for (int i = 0; i < streams.length; i++) {
+                streams[i] = wrap(streams[i]);
+            }
+
             // Create streams
             in = Channels.newInputStream((ReadableByteChannel) streams[0]);
             out = new PrintStream(Channels.newOutputStream((WritableByteChannel) streams[1]), true);
@@ -396,4 +334,96 @@
             }
         }
     }
+
+    private Channel wrap(Channel channel) {
+        if (channel == null) {
+            return null;
+        }
+        if (channel instanceof MultiChannel) {
+            return channel;
+        }
+        MultiChannel mch = new MultiChannel();
+        mch.addChannel(channel, true);
+        return mch;
+    }
+
+    private class MultiChannel implements ByteChannel {
+        protected final List<Channel> channels = new ArrayList<>();
+        protected final List<Channel> toClose = new ArrayList<>();
+        protected final AtomicBoolean opened = new AtomicBoolean(true);
+        int index = 0;
+
+        public void addChannel(Channel channel, boolean toclose) {
+            channels.add(channel);
+            if (toclose) {
+                toClose.add(channel);
+            }
+        }
+
+        public boolean isOpen() {
+            return opened.get();
+        }
+
+        public void close() throws IOException {
+            if (opened.compareAndSet(true, false)) {
+                for (Channel channel : toClose) {
+                    channel.close();
+                }
+            }
+        }
+
+        public int read(ByteBuffer dst) throws IOException {
+            int nbRead = -1;
+            while (nbRead < 0 && index < channels.size()) {
+                Channel ch = channels.get(index);
+                checkSuspend(ch);
+                nbRead = ((ReadableByteChannel) ch).read(dst);
+                if (nbRead < 0) {
+                    index++;
+                } else {
+                    break;
+                }
+            }
+            return nbRead;
+        }
+
+        public int write(ByteBuffer src) throws IOException {
+            int pos = src.position();
+            for (Channel ch : channels) {
+                checkSuspend(ch);
+                src.position(pos);
+                while (src.hasRemaining()) {
+                    ((WritableByteChannel) ch).write(src);
+                }
+            }
+            return src.position() - pos;
+        }
+
+        private void checkSuspend(Channel ch) throws IOException {
+            Job cur = closure.session().currentJob();
+            if (cur != null) {
+                Channel[] sch = closure.session().channels;
+                if (ch == sch[0] || ch == sch[1] || ch == sch[2]) {
+                    synchronized (cur) {
+                        if (cur.status() == Status.Background) {
+                            // TODO: Send SIGTIN / SIGTOU
+                            cur.suspend();
+                        }
+                    }
+                }
+                synchronized (cur) {
+                    while (cur.status() == Status.Suspended) {
+                        try {
+                            cur.wait();
+                        } catch (InterruptedException e) {
+                            throw (IOException) new InterruptedIOException().initCause(e);
+                        }
+                    }
+                }
+            } else {
+                String msg = "This is definitely not expected";
+            }
+        }
+    }
+
 }
diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
index 58dbd22..71e5ddc 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
@@ -21,6 +21,10 @@
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.felix.gogo.api.JobListener;
+import org.apache.felix.gogo.runtime.Job;
 
 public interface CommandSession
 {
@@ -98,4 +102,13 @@
      */
 
     Object convert(Class<?> type, Object instance);
+
+    List<Job> jobs();
+
+    Job currentJob();
+
+    Job foregroundJob();
+
+    void setJobListener(JobListener listener);
+
 }