Support redirections
Work in progress (missing support for &&, || and &)

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1735996 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/gogo/runtime/pom.xml b/gogo/runtime/pom.xml
index 4ee7ff9..00b1635 100644
--- a/gogo/runtime/pom.xml
+++ b/gogo/runtime/pom.xml
@@ -28,6 +28,9 @@
     <name>Apache Felix Gogo Runtime</name>
     <artifactId>org.apache.felix.gogo.runtime</artifactId>
     <version>0.16.3-SNAPSHOT</version>
+    <properties>
+        <felix.java.version>8</felix.java.version>
+    </properties>
     <dependencies>
         <dependency>
             <groupId>org.osgi</groupId>
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
index e945cdc..7e1cc89 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
@@ -19,9 +19,14 @@
 package org.apache.felix.gogo.runtime;
 
 import java.io.EOFException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.channels.Channel;
+import java.nio.channels.Channels;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +34,7 @@
 
 import org.apache.felix.gogo.runtime.Parser.Array;
 import org.apache.felix.gogo.runtime.Parser.Executable;
+import org.apache.felix.gogo.runtime.Parser.Operator;
 import org.apache.felix.gogo.runtime.Parser.Pipeline;
 import org.apache.felix.gogo.runtime.Parser.Program;
 import org.apache.felix.gogo.runtime.Parser.Sequence;
@@ -173,52 +179,100 @@
         }
 
         Pipe last = null;
-        Object[] mark = Pipe.mark();
-
-        for (Executable executable : program.tokens())
+        Operator operator = null;
+        for (Iterator<Executable> iterator = program.tokens().iterator(); iterator.hasNext();)
         {
-            List<Pipe> pipes = toPipes(executable);
-
-            for (int i = 0; i < pipes.size(); i++)
-            {
-                Pipe current = pipes.get(i);
-                if (i == 0)
-                {
-                    if (current.out == null)
-                    {
-                        current.setIn(session.in);
-                        current.setOut(session.out);
-                        current.setErr(session.err);
+            if (operator != null) {
+                if (Token.eq("&&", operator)) {
+                    if (!isSuccess(last)) {
+                        continue;
                     }
                 }
-                else
-                {
-                    Pipe previous = pipes.get(i - 1);
-                    previous.connect(current);
+                else if (Token.eq("||", operator)) {
+                    if (isSuccess(last)) {
+                        continue;
+                    }
                 }
             }
+            Executable executable = iterator.next();
+            if (iterator.hasNext()) {
+                operator = (Operator) iterator.next();
+            } else {
+                operator = null;
+            }
 
+            if (operator != null && Token.eq("&", operator)) {
+                // TODO: need to start in background
+            }
+
+            Channel[] mark = Pipe.mark();
+            Channel[] streams;
+            boolean[] toclose = new boolean[10];
+            if (mark == null) {
+                streams = new Channel[10];
+                streams[0] = Channels.newChannel(session.in);
+                streams[1] = Channels.newChannel(session.out);
+                streams[2] = Channels.newChannel(session.err);
+            } else {
+                streams = mark.clone();
+            }
+
+            List<Pipe> pipes = new ArrayList<Pipe>();
+            if (executable instanceof Pipeline) {
+                Pipeline pipeline = (Pipeline) executable;
+                List<Executable> exec = pipeline.tokens();
+                for (int i = 0; i < exec.size(); i++) {
+                    Executable ex = exec.get(i);
+                    Operator op = i < exec.size() - 1 ? (Operator) exec.get(++i) : null;
+                    Channel[] nstreams;
+                    boolean[] ntoclose;
+                    if (i == exec.size() - 1) {
+                        nstreams = streams;
+                        ntoclose = toclose;
+                    } else if (Token.eq("|", op)) {
+                        PipedInputStream pis = new PipedInputStream();
+                        PipedOutputStream pos = new PipedOutputStream(pis);
+                        nstreams = streams.clone();
+                        nstreams[1] = Channels.newChannel(pos);
+                        ntoclose = toclose.clone();
+                        ntoclose[1] = true;
+                        streams[0] = Channels.newChannel(pis);
+                        toclose[0] = true;
+                    } else if (Token.eq("|&", op)) {
+                        PipedInputStream pis = new PipedInputStream();
+                        PipedOutputStream pos = new PipedOutputStream(pis);
+                        nstreams = streams.clone();
+                        nstreams[1] = nstreams[2] = Channels.newChannel(pos);
+                        ntoclose = toclose.clone();
+                        ntoclose[1] = ntoclose[2] = true;
+                        streams[0] = Channels.newChannel(pis);
+                        toclose[0] = true;
+                    } else {
+                        throw new IllegalStateException("Unrecognized pipe operator: '" + op + "'");
+                    }
+                    pipes.add(new Pipe(this, ex, nstreams, ntoclose));
+                }
+            } else {
+                pipes.add(new Pipe(this, executable, streams, toclose));
+            }
+
+            // Don't start a thread if we have a single pipe
             if (pipes.size() == 1)
             {
                 pipes.get(0).run();
             }
-            else if (pipes.size() > 1)
-            {
-                for (Pipe pipe : pipes)
-                {
+            else {
+                // Start threads
+                for (Pipe pipe : pipes) {
                     pipe.start();
                 }
-                try
-                {
-                    for (Pipe pipe : pipes)
-                    {
+                // Wait for them
+                try {
+                    for (Pipe pipe : pipes) {
                         pipe.join();
                     }
-                }
-                catch (InterruptedException e)
-                {
-                    for (Pipe pipe : pipes)
-                    {
+                } catch (InterruptedException e) {
+                    for (Pipe pipe : pipes) {
                         pipe.interrupt();
                     }
                     throw e;
@@ -231,42 +285,23 @@
                 if (pipe.exception != null)
                 {
                     // can't throw exception, as result is defined by last pipe
-                    Object oloc = session.get(LOCATION);
-                    String loc = (String.valueOf(oloc).contains(":") ? oloc + ": "
-                        : "pipe: ");
-                    session.err.println(loc + pipe.exception);
                     session.put("pipe-exception", pipe.exception);
                 }
             }
             last = pipes.get(pipes.size() - 1);
-            if (last.exception != null)
+
+            boolean errReturn = true;
+            if (last.exit != 0 && errReturn)
             {
-                Pipe.reset(mark);
                 throw last.exception;
             }
         }
 
-        Pipe.reset(mark); // reset IO in case same thread used for new client
-
         return last == null ? null : last.result;
     }
 
-    private List<Pipe> toPipes(Executable executable)
-    {
-        if (executable instanceof Pipeline)
-        {
-            List<Pipe> pipes = new ArrayList<Pipe>();
-            Pipeline pipeline = (Pipeline) executable;
-            for (Executable ex : pipeline.tokens())
-            {
-                pipes.add(new Pipe(this, ex));
-            }
-            return pipes;
-        }
-        else
-        {
-            return Collections.singletonList(new Pipe(this, executable));
-        }
+    private boolean isSuccess(Pipe pipe) {
+        return pipe.exit == 0;
     }
 
     private Object eval(Object v)
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
index 97ed681..cc38c8d 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
@@ -25,6 +25,8 @@
 import java.io.PrintStream;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -58,12 +60,15 @@
     protected final ConcurrentMap<String, Object> variables = new ConcurrentHashMap<String, Object>();
     private volatile boolean closed;
 
+    private Path currentDir;
+
     protected CommandSessionImpl(CommandProcessorImpl shell, InputStream in, PrintStream out, PrintStream err)
     {
         this.processor = shell;
         this.in = in;
         this.out = out;
         this.err = err;
+        this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize();
     }
 
     ThreadIO threadIO()
@@ -81,6 +86,14 @@
         return variables;
     }
 
+    public Path currentDir() {
+        return currentDir;
+    }
+
+    public void currentDir(Path path) {
+        currentDir = path;
+    }
+
     public void close()
     {
         if (!this.closed)
@@ -353,6 +366,10 @@
                 }
             }
         }
+        if (target instanceof Path)
+        {
+            return target.toString();
+        }
         if (level == Converter.INSPECT)
         {
             return inspect(target);
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Parser.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Parser.java
index 3b9192c..de317fc 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Parser.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Parser.java
@@ -25,6 +25,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 
 public class Parser
@@ -38,20 +39,33 @@
         }
     }
 
+    public static class Operator extends Executable
+    {
+        public Operator(Token cs) {
+            super(cs);
+        }
+    }
+
     public static class Statement extends Executable
     {
         private final List<Token> tokens;
+        private final List<Token> redirections;
 
-        public Statement(Token cs, List<Token> tokens)
+        public Statement(Token cs, List<Token> tokens, List<Token> redirections)
         {
             super(cs);
             this.tokens = tokens;
+            this.redirections = redirections;
         }
 
         public List<Token> tokens()
         {
             return tokens;
         }
+
+        public List<Token> redirections() {
+            return redirections;
+        }
     }
 
     /**
@@ -222,7 +236,7 @@
                 ex = statement();
             }
             t = next();
-            if (t == null || Token.eq(";", t) || Token.eq("\n", t))
+            if (t == null || Token.eq(";", t) || Token.eq("\n", t) || Token.eq("&", t) || Token.eq("&&", t) || Token.eq("||", t))
             {
                 if (pipes != null)
                 {
@@ -238,14 +252,18 @@
                 {
                     return new Program(whole(tokens, start), tokens);
                 }
+                else {
+                    tokens.add(new Operator(t));
+                }
             }
-            else if (Token.eq("|", t))
+            else if (Token.eq("|", t) || Token.eq("|&", t))
             {
                 if (pipes == null)
                 {
                     pipes = new ArrayList<Executable>();
                 }
                 pipes.add(ex);
+                pipes.add(new Operator(t));
             }
             else
             {
@@ -295,21 +313,33 @@
         return new Closure(whole(start, end), program);
     }
 
+    private final Pattern redirNoArg = Pattern.compile("[0-9]?>&[0-9-]|[0-9-]?<&[0-9-]");
+    private final Pattern redirArg = Pattern.compile("[0-9&]?>|[0-9]?>>|[0-9]?<|[0-9]?<>");
+
     public Statement statement()
     {
         List<Token> tokens = new ArrayList<Token>();
+        List<Token> redirs = new ArrayList<Token>();
+        boolean needRedirArg = false;
         int start = tz.index;
         while (true)
         {
             Token t = next();
             if (t == null
-                    || Token.eq("|", t)
                     || Token.eq("\n", t)
                     || Token.eq(";", t)
+                    || Token.eq("&", t)
+                    || Token.eq("&&", t)
+                    || Token.eq("||", t)
+                    || Token.eq("|", t)
+                    || Token.eq("|&", t)
                     || Token.eq("}", t)
                     || Token.eq(")", t)
                     || Token.eq("]", t))
             {
+                if (needRedirArg) {
+                    throw new EOFError(tz.line, tz.column, "Expected file name for redirection", "redir", "foo");
+                }
                 push(t);
                 break;
             }
@@ -328,12 +358,26 @@
                 push(t);
                 tokens.add(sequence());
             }
+            else if (needRedirArg)
+            {
+                redirs.add(t);
+                needRedirArg = false;
+            }
+            else if (redirNoArg.matcher(t).matches())
+            {
+                redirs.add(t);
+            }
+            else if (redirArg.matcher(t).matches())
+            {
+                redirs.add(t);
+                needRedirArg = true;
+            }
             else
             {
                 tokens.add(t);
             }
         }
-        Statement statement = new Statement(whole(tokens, start), tokens);
+        Statement statement = new Statement(whole(tokens, start), tokens, redirs);
         statements.add(statement);
         return statement;
     }
@@ -360,8 +404,8 @@
             {
                 continue;
             }
-            if (Token.eq("{", key) || Token.eq(";", key)
-                    || Token.eq("|", key) || Token.eq(")", key) || Token.eq("}", key) || Token.eq("=", key))
+            if (Token.eq("{", key) || Token.eq(";", key) || Token.eq("&", key) || Token.eq("&&", key) || Token.eq("||", key)
+                    || Token.eq("|", key) || Token.eq("|&", key) || Token.eq(")", key) || Token.eq("}", key) || Token.eq("=", key))
             {
                 throw new SyntaxError(key.line(), key.column(), "unexpected token '" + key + "' while looking for array key");
             }
@@ -393,7 +437,7 @@
                 {
                     throw new EOFError(tz.line, tz.column, "unexpected EOF while looking for array value", getMissing(), "0");
                 }
-                else if (Token.eq(";", val) || Token.eq("|", val)
+                else if (Token.eq(";", val) || Token.eq("&", val) || Token.eq("&&", val) || Token.eq("||", val) || Token.eq("|", val) || Token.eq("|&", val)
                         || Token.eq(")", key) || Token.eq("}", key) || Token.eq("=", key))
                 {
                     throw new SyntaxError(key.line(), key.column(), "unexpected token '" + key + "' while looking for array value");
@@ -528,4 +572,25 @@
         return tz.text.subSequence(b.start, e.start + e.length());
     }
 
+    protected boolean isPiped(Token t) {
+        return Token.eq("|", t) || Token.eq("|&", t);
+    }
+
+    /*
+    protected boolean isRedirection(Token t) {
+        return Token.eq("<", t)
+                || Token.eq("<>", t)
+                || Token.eq(">", t)
+                || Token.eq(">|", t)
+                || Token.eq(">!", t)
+                || Token.eq(">>", t)
+                || Token.eq(">>|", t)
+                || Token.eq(">>!", t)
+                || Token.eq("<<", t)
+                || Token.eq("<<-", t)
+                || Token.eq("<&", t.subSequence(0, 2)) &&
+                || Token.eq("<&1", t)
+
+    }
+    */
 }
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
index ed2dba9..eb2bf81 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
@@ -20,125 +20,307 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.io.PrintStream;
-import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.felix.gogo.runtime.Parser.Executable;
+import org.apache.felix.gogo.runtime.Parser.Statement;
 import org.apache.felix.service.command.Converter;
 
 public class Pipe extends Thread
 {
-    static final ThreadLocal<InputStream> tIn = new ThreadLocal<InputStream>();
-    static final ThreadLocal<PrintStream> tOut = new ThreadLocal<PrintStream>();
-    static final ThreadLocal<PrintStream> tErr = new ThreadLocal<PrintStream>();
-    InputStream in;
-    PrintStream out;
-    PrintStream err;
-    PipedOutputStream pout;
-    Closure closure;
-    Exception exception;
+    static final ThreadLocal<Channel[]> tStreams = new ThreadLocal<Channel[]>();
+
+    public static Channel[] mark() {
+        return tStreams.get();
+    }
+
+    public static void reset(Channel[] streams) {
+        tStreams.set(streams);
+    }
+
+    final Closure closure;
+    final Executable executable;
+    final Channel[] streams;
+    final boolean[] toclose;
     Object result;
-    Executable executable;
+    Exception exception;
+    int exit = 0;
 
-    public static Object[] mark()
-    {
-        Object[] mark = { tIn.get(), tOut.get(), tErr.get() };
-        return mark;
-    }
-
-    public static void reset(Object[] mark)
-    {
-        tIn.set((InputStream) mark[0]);
-        tOut.set((PrintStream) mark[1]);
-        tErr.set((PrintStream) mark[2]);
-    }
-
-    public Pipe(Closure closure, Executable executable)
+    public Pipe(Closure closure, Executable executable, Channel[] streams, boolean[] toclose)
     {
         super("pipe-" + executable);
         this.closure = closure;
         this.executable = executable;
-
-        in = tIn.get();
-        out = tOut.get();
-        err = tErr.get();
+        this.streams = streams;
+        this.toclose = toclose;
     }
 
     public String toString()
     {
-        return "pipe<" + executable + "> out=" + out;
+        return "pipe<" + executable + "> out=" + streams[1];
     }
 
-    public void setIn(InputStream in)
-    {
-        this.in = in;
+    private static final int READ = 1;
+    private static final int WRITE = 2;
+
+    private void setStream(Channel ch, int fd, int readWrite) throws IOException {
+        if ((readWrite & READ) != 0 && !(ch instanceof ReadableByteChannel)) {
+            throw new IllegalArgumentException("Channel is not readable");
+        }
+        if ((readWrite & WRITE) != 0 && !(ch instanceof WritableByteChannel)) {
+            throw new IllegalArgumentException("Channel is not writable");
+        }
+        if (fd == 0 && !(ch instanceof ReadableByteChannel)) {
+            throw new IllegalArgumentException("Stdin is not readable");
+        }
+        if (fd == 1 && !(ch instanceof WritableByteChannel)) {
+            throw new IllegalArgumentException("Stdout is not writable");
+        }
+        if (fd == 2 && !(ch instanceof WritableByteChannel)) {
+            throw new IllegalArgumentException("Stderr is not writable");
+        }
+        // TODO: externalize
+        boolean multios = true;
+        if (multios) {
+            if (streams[fd] != null && (readWrite & READ) != 0 && (readWrite & WRITE) != 0) {
+                throw new IllegalArgumentException("Can not do multios with read/write streams");
+            }
+            if ((readWrite & READ) != 0) {
+                MultiReadableByteChannel mrbc;
+                if (streams[fd] instanceof MultiReadableByteChannel) {
+                    mrbc = (MultiReadableByteChannel) streams[fd];
+                } else {
+                    mrbc = new MultiReadableByteChannel();
+                    mrbc.addChannel((ReadableByteChannel) streams[fd], toclose[fd]);
+                    streams[fd] = mrbc;
+                    toclose[fd] = true;
+                }
+                mrbc.addChannel((ReadableByteChannel) ch, true);
+            } else if ((readWrite & WRITE) != 0) {
+                MultiWritableByteChannel mrbc;
+                if (streams[fd] instanceof MultiWritableByteChannel) {
+                    mrbc = (MultiWritableByteChannel) streams[fd];
+                } else {
+                    mrbc = new MultiWritableByteChannel();
+                    mrbc.addChannel((WritableByteChannel) streams[fd], toclose[fd]);
+                    streams[fd] = mrbc;
+                    toclose[fd] = true;
+                }
+                mrbc.addChannel((WritableByteChannel) ch, true);
+            } else {
+                throw new IllegalStateException();
+            }
+        }
+        else {
+            if (streams[fd] != null && toclose[fd]) {
+                streams[fd].close();
+            }
+            streams[fd] = ch;
+            toclose[fd] = true;
+        }
     }
 
-    public void setOut(PrintStream out)
-    {
-        this.out = out;
+    private static class MultiChannel<T extends Channel> implements Channel {
+        protected final List<T> channels = new ArrayList<T>();
+        protected final List<T> toClose = new ArrayList<T>();
+        protected final AtomicBoolean opened = new AtomicBoolean(true);
+        public void addChannel(T channel, boolean toclose) {
+            channels.add(channel);
+            if (toclose) {
+                toClose.add(channel);
+            }
+        }
+
+        public boolean isOpen() {
+            return opened.get();
+        }
+
+        public void close() throws IOException {
+            if (opened.compareAndSet(true, false)) {
+                for (T channel : toClose) {
+                    channel.close();
+                }
+            }
+        }
     }
 
-    public void setErr(PrintStream err)
-    {
-        this.err = err;
+    private static class MultiReadableByteChannel extends MultiChannel<ReadableByteChannel> implements ReadableByteChannel {
+        int index = 0;
+        public int read(ByteBuffer dst) throws IOException {
+            int nbRead = -1;
+            while (nbRead < 0 && index < channels.size()) {
+                nbRead = channels.get(index).read(dst);
+                if (nbRead < 0) {
+                    index++;
+                } else {
+                    break;
+                }
+            }
+            return nbRead;
+        }
     }
 
-    public Pipe connect(Pipe next) throws IOException
-    {
-        next.setOut(out);
-        next.setErr(err);
-        pout = new PipedOutputStream();
-        next.setIn(new PipedInputStream(pout));
-        out = new PrintStream(pout);
-        return next;
+    private static class MultiWritableByteChannel extends MultiChannel<WritableByteChannel> implements WritableByteChannel {
+        public int write(ByteBuffer src) throws IOException {
+            int pos = src.position();
+            for (WritableByteChannel ch : channels) {
+                src.position(pos);
+                while (src.hasRemaining()) {
+                    ch.write(src);
+                }
+            }
+            return src.position() - pos;
+        }
     }
 
     public void run()
     {
-        tIn.set(in);
-        tOut.set(out);
-        tErr.set(err);
-        closure.session().threadIO().setStreams(in, out, err);
+        InputStream in = null;
+        PrintStream out = null;
+        PrintStream err = null;
+        WritableByteChannel errChannel = (WritableByteChannel) streams[2];
 
+        Channel[] prevStreams = tStreams.get();
         try
         {
-            result = closure.execute(executable);
-            if (result != null && pout != null)
-            {
-                if (!Boolean.FALSE.equals(closure.session().get(".FormatPipe")))
-                {
-                    out.println(closure.session().format(result, Converter.INSPECT));
+            if (executable instanceof Statement) {
+                Statement statement = (Statement) executable;
+                List<Token> tokens = statement.redirections();
+                for (int i = 0; i < tokens.size(); i++) {
+                    Token t = tokens.get(i);
+                    Matcher m;
+                    if ((m = Pattern.compile("(?:([0-9])?|(&)?)>(>)?").matcher(t)).matches()) {
+                        int fd;
+                        if (m.group(1) != null) {
+                            fd = Integer.parseInt(m.group(1));
+                        }
+                        else if (m.group(2) != null) {
+                            fd = -1; // both 1 and 2
+                        } else {
+                            fd = 1;
+                        }
+                        boolean append = m.group(3) != null;
+                        Token file = tokens.get(++i);
+                        Path outPath = closure.session().currentDir().resolve(file.toString());
+                        Set<StandardOpenOption> options = new HashSet<StandardOpenOption>();
+                        options.add(StandardOpenOption.WRITE);
+                        options.add(StandardOpenOption.CREATE);
+                        if (append) {
+                            options.add(StandardOpenOption.APPEND);
+                        } else {
+                            options.add(StandardOpenOption.TRUNCATE_EXISTING);
+                        }
+                        Channel ch = Files.newByteChannel(outPath, options);
+                        if (fd >= 0) {
+                            setStream(ch, fd, WRITE);
+                        } else {
+                            setStream(ch, 1, WRITE);
+                            setStream(ch, 2, WRITE);
+                        }
+                    }
+                    else if ((m = Pattern.compile("([0-9])?>&([0-9])").matcher(t)).matches()) {
+                        int fd0 = 1;
+                        if (m.group(1) != null) {
+                            fd0 = Integer.parseInt(m.group(1));
+                        }
+                        int fd1 = Integer.parseInt(m.group(2));
+                        if (streams[fd0] != null && toclose[fd0]) {
+                            streams[fd0].close();
+                        }
+                        streams[fd0] = streams[fd1];
+                        // TODO: this is wrong, we should keep a counter somehow so that the
+                        // stream is closed when both are closed
+                        toclose[fd0] = false;
+                    }
+                    else if ((m = Pattern.compile("([0-9])?<(>)?").matcher(t)).matches()) {
+                        int fd = 0;
+                        if (m.group(1) != null) {
+                            fd = Integer.parseInt(m.group(1));
+                        }
+                        boolean output = m.group(2) != null;
+                        Token file = tokens.get(++i);
+                        Path inPath = closure.session().currentDir().resolve(file.toString());
+                        Set<StandardOpenOption> options = new HashSet<StandardOpenOption>();
+                        options.add(StandardOpenOption.READ);
+                        if (output) {
+                            options.add(StandardOpenOption.WRITE);
+                            options.add(StandardOpenOption.CREATE);
+                        }
+                        Channel ch = Files.newByteChannel(inPath, options);
+                        setStream(ch, fd, READ + (output ? WRITE : 0));
+                    }
                 }
+            } else {
+                new UnsupportedOperationException("what to do ?").printStackTrace();
+            }
+
+            tStreams.set(streams);
+
+            // TODO: not sure this is the correct way
+            boolean endOfPipe = !toclose[1];
+
+            in = Channels.newInputStream((ReadableByteChannel) streams[0]);
+            out = new PrintStream(Channels.newOutputStream((WritableByteChannel) streams[1]), true);
+            err = new PrintStream(Channels.newOutputStream((WritableByteChannel) streams[2]), true);
+            errChannel = (WritableByteChannel) streams[2];
+
+            closure.session().threadIO().setStreams(in, out, err);
+
+            result = closure.execute(executable);
+            // We don't print the result if toclose[1] == false, which means we're at the end of the pipe
+            if (result != null && !endOfPipe && !Boolean.FALSE.equals(closure.session().get(".FormatPipe"))) {
+                out.println(closure.session().format(result, Converter.INSPECT));
             }
         }
         catch (Exception e)
         {
             exception = e;
+            if (exit == 0) {
+                exit = 1; // failure
+            }
+            // TODO: use shell name instead of 'gogo'
+            // TODO: use color if not redirected
+            // TODO: use conversion ?
+            String msg = "gogo: " + e.getClass().getSimpleName() + ": " + e.getMessage() + "\n";
+            try {
+                errChannel.write(ByteBuffer.wrap(msg.getBytes()));
+            } catch (IOException ioe) {
+                e.addSuppressed(ioe);
+            }
         }
         finally
         {
-            out.flush();
+            if (out != null) {
+                out.flush();
+            }
+            if (err != null) {
+                err.flush();
+            }
             closure.session().threadIO().close();
 
+            tStreams.set(prevStreams);
+
             try
             {
-                if (pout != null)
-                {
-                    pout.close();
-                }
-
-                if (in instanceof PipedInputStream)
-                {
-                    in.close();
-
-                    // avoid writer waiting when reader has given up (FELIX-2380)
-                    Method m = in.getClass().getDeclaredMethod("receivedLast",
-                        (Class<?>[]) null);
-                    m.setAccessible(true);
-                    m.invoke(in, (Object[]) null);
+                for (int i = 0; i < 10; i++) {
+                    if (toclose[i] && streams[i] != null) {
+                        streams[i].close();
+                    }
                 }
             }
             catch (Exception e)
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Tokenizer.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Tokenizer.java
index 390becc..ab1b164 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Tokenizer.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Tokenizer.java
@@ -18,9 +18,13 @@
  */
 package org.apache.felix.gogo.runtime;
 
+import java.util.regex.Pattern;
+
 public class Tokenizer extends BaseTokenizer
 {
 
+    private final Pattern redir = Pattern.compile("[0-9&]?>|[0-9]?>>|[0-9]?>&|[0-9]?<|[0-9]?<>");
+
     protected boolean inArray;
     protected int word = 0;
 
@@ -47,6 +51,7 @@
         }
         skipSpace(last == null || Token.eq(last, "\n"));
         int start = index - 1;
+        Token t, tn;
         while (true)
         {
             switch (ch)
@@ -81,8 +86,46 @@
                         getch();
                         break;
                     }
-                case ';':
+                case '>':
+                case '<':
+                    t = text.subSequence(start, index);
+                    tn = text.subSequence(start, index + 1);
+                    if (redir.matcher(tn).matches()) {
+                        getch();
+                        break;
+                    }
+                    if (redir.matcher(t).matches() && start < index - 1) {
+                        getch();
+                    }
+                    word = 0;
+                    return token(start);
+                case '&':
+                    // beginning of token
+                    if (start == index - 1) {
+                        if (peek() == '&' || peek() == '>') {
+                            getch();
+                            getch();
+                        }
+                        word = 0;
+                        return token(start);
+                    }
+                    // in the middle of a redirection
+                    else if (redir.matcher(text.subSequence(start, index)).matches()) {
+                        getch();
+                        break;
+                    }
+                    else {
+                        word = 0;
+                        return token(start);
+                    }
                 case '|':
+                    if (start == index - 1 && (peek() == '|' || peek() == '&')) {
+                        getch();
+                        getch();
+                    }
+                    word = 0;
+                    return token(start);
+                case ';':
                     word = 0;
                     return token(start);
                 case '}':
diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
index c81c8af..58dbd22 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
@@ -20,9 +20,14 @@
 
 import java.io.InputStream;
 import java.io.PrintStream;
+import java.nio.file.Path;
 
 public interface CommandSession
 {
+    Path currentDir();
+
+    void currentDir(Path path);
+
     /**
      * Execute a program in this session.
      *
diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java
index 7522397..5d74295 100644
--- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java
+++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java
@@ -18,16 +18,28 @@
  */
 package org.apache.felix.gogo.runtime;
 
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
 import junit.framework.TestCase;
 import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl;
 
 public abstract class AbstractParserTest extends TestCase {
 
     private ThreadIOImpl threadIO;
+    private InputStream sin;
+    private PrintStream sout;
+    private PrintStream serr;
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
+        sin = new NoCloseInputStream(System.in);
+        sout = new NoClosePrintStream(System.out);
+        serr = new NoClosePrintStream(System.err);
         threadIO = new ThreadIOImpl();
         threadIO.start();
     }
@@ -40,7 +52,25 @@
 
     public class Context extends org.apache.felix.gogo.runtime.Context {
         public Context() {
-            super(AbstractParserTest.this.threadIO);
+            super(AbstractParserTest.this.threadIO, sin, sout, serr);
+        }
+    }
+
+    private static class NoCloseInputStream extends FilterInputStream {
+        public NoCloseInputStream(InputStream in) {
+            super(in);
+        }
+        @Override
+        public void close() throws IOException {
+        }
+    }
+
+    private static class NoClosePrintStream extends PrintStream {
+        public NoClosePrintStream(OutputStream out) {
+            super(out);
+        }
+        @Override
+        public void close() {
         }
     }
 
diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/Context.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/Context.java
index e4096d2..4db6b5f 100644
--- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/Context.java
+++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/Context.java
@@ -18,6 +18,10 @@
  */
 package org.apache.felix.gogo.runtime;
 
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.nio.file.Path;
+
 import org.apache.felix.service.command.CommandSession;
 import org.apache.felix.service.threadio.ThreadIO;
 
@@ -27,13 +31,13 @@
     
     private final CommandSession session;
 
-    public Context(ThreadIO threadio)
+    public Context(ThreadIO threadio, InputStream in, PrintStream out, PrintStream err)
     {
         super(threadio);
         addCommand("osgi", this, "addCommand");
         addCommand("osgi", this, "removeCommand");
         addCommand("osgi", this, "eval");
-        session = createSession(System.in, System.out, System.err);
+        session = createSession(in, out, err);
     }
 
     public Object execute(CharSequence source) throws Exception
@@ -66,4 +70,11 @@
         return session.get(name);
     }
 
+    public void currentDir(Path path) {
+        session.currentDir(path);
+    }
+
+    public Path currentDir() {
+        return session.currentDir();
+    }
 }
diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestParser.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestParser.java
index a034f2b..2dffb5a 100644
--- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestParser.java
+++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestParser.java
@@ -113,15 +113,14 @@
         c.addCommand("grep", this);
         c.addCommand("echoout", this);
         c.execute("myecho = { echoout $args }");
+
         assertEquals("def", c.execute("echo def|grep d.*|capture"));
         assertEquals("def", c.execute("echoout def|grep d.*|capture"));
         assertEquals("def", c.execute("myecho def|grep d.*|capture"));
-        assertEquals("def",
-            c.execute("(echoout abc; echoout def; echoout ghi)|grep d.*|capture"));
+        assertEquals("def", c.execute("(echoout abc; echoout def; echoout ghi)|grep d.*|capture"));
         assertEquals("", c.execute("echoout def; echoout ghi | grep d.* | capture"));
         assertEquals("hello world", c.execute("echo hello world|capture"));
-        assertEquals("defghi",
-            c.execute("(echoout abc; echoout def; echoout ghi)|grep 'def|ghi'|capture"));
+        assertEquals("defghi", c.execute("(echoout abc; echoout def; echoout ghi)|grep 'def|ghi'|capture"));
     }
 
     public void testAssignment() throws Exception
@@ -314,10 +313,13 @@
         Program x = new Parser("abc def|ghi jkl;mno pqr|stu vwx").program();
         Pipeline p0 = (Pipeline) x.tokens().get(0);
         Statement s00 = (Statement) p0.tokens().get(0);
-        Statement s01 = (Statement) p0.tokens().get(1);
-        Pipeline p1 = (Pipeline) x.tokens().get(1);
+        assertEquals("|", p0.tokens().get(1).toString());
+        Statement s01 = (Statement) p0.tokens().get(2);
+        assertEquals(";", x.tokens().get(1).toString());
+        Pipeline p1 = (Pipeline) x.tokens().get(2);
         Statement s10 = (Statement) p1.tokens().get(0);
-        Statement s11 = (Statement) p1.tokens().get(1);
+        assertEquals("|", p1.tokens().get(1).toString());
+        Statement s11 = (Statement) p1.tokens().get(2);
         assertEquals("abc", s00.tokens().get(0).toString());
         assertEquals("def", s00.tokens().get(1).toString());
         assertEquals("ghi", s01.tokens().get(0).toString());
@@ -333,8 +335,8 @@
         Program x = new Parser("abc def|ghi jkl|mno pqr").program();
         Pipeline p0 = (Pipeline) x.tokens().get(0);
         Statement s00 = (Statement) p0.tokens().get(0);
-        Statement s01 = (Statement) p0.tokens().get(1);
-        Statement s02 = (Statement) p0.tokens().get(2);
+        Statement s01 = (Statement) p0.tokens().get(2);
+        Statement s02 = (Statement) p0.tokens().get(4);
         assertEquals("abc", s00.tokens().get(0).toString());
         assertEquals("def", s00.tokens().get(1).toString());
         assertEquals("ghi", s01.tokens().get(0).toString());
@@ -343,15 +345,78 @@
         assertEquals("pqr", s02.tokens().get(1).toString());
     }
 
+    public void testPipeRedir()
+    {
+        Program x = new Parser("abc def|&ghi").program();
+        Pipeline p0 = (Pipeline) x.tokens().get(0);
+        Statement s00 = (Statement) p0.tokens().get(0);
+        assertEquals("|&", p0.tokens().get(1).toString());
+        Statement s01 = (Statement) p0.tokens().get(2);
+        assertEquals("abc", s00.tokens().get(0).toString());
+        assertEquals("def", s00.tokens().get(1).toString());
+        assertEquals("ghi", s01.tokens().get(0).toString());
+    }
+
+    public void testPipeAndOr()
+    {
+        Program x = new Parser("abc|def&&ghi || jkl").program();
+        Pipeline p0 = (Pipeline) x.tokens().get(0);
+        Statement s00 = (Statement) p0.tokens().get(0);
+        assertEquals("|", p0.tokens().get(1).toString());
+        Statement s01 = (Statement) p0.tokens().get(2);
+        assertEquals("&&", x.tokens().get(1).toString());
+        Statement s1 = (Statement) x.tokens().get(2);
+        assertEquals("||", x.tokens().get(3).toString());
+        Statement s2 = (Statement) x.tokens().get(4);
+        assertEquals("abc", s00.tokens().get(0).toString());
+        assertEquals("def", s01.tokens().get(0).toString());
+        assertEquals("ghi", s1.tokens().get(0).toString());
+        assertEquals("jkl", s2.tokens().get(0).toString());
+    }
+
+    public void testBackground() {
+        Program x = new Parser("echo foo&echo bar").program();
+        Statement s0 = (Statement) x.tokens().get(0);
+        assertEquals("&", x.tokens().get(1).toString());
+        Statement s1 = (Statement) x.tokens().get(2);
+        assertEquals("echo", s0.tokens().get(0).toString());
+        assertEquals("foo", s0.tokens().get(1).toString());
+        assertEquals("echo", s1.tokens().get(0).toString());
+        assertEquals("bar", s1.tokens().get(1).toString());
+    }
+
+    public void testRedir() {
+        Program x = new Parser("echo foo&>bar").program();
+        Statement s0 = (Statement) x.tokens().get(0);
+        assertEquals("echo", s0.tokens().get(0).toString());
+        assertEquals("foo", s0.tokens().get(1).toString());
+        assertEquals("&>", s0.redirections().get(0).toString());
+        assertEquals("bar", s0.redirections().get(1).toString());
+
+        x = new Parser("echo foo1>bar").program();
+        s0 = (Statement) x.tokens().get(0);
+        assertEquals("echo", s0.tokens().get(0).toString());
+        assertEquals("foo1", s0.tokens().get(1).toString());
+        assertEquals(">", s0.redirections().get(0).toString());
+        assertEquals("bar", s0.redirections().get(1).toString());
+
+        x = new Parser("echo foo 1>bar").program();
+        s0 = (Statement) x.tokens().get(0);
+        assertEquals("echo", s0.tokens().get(0).toString());
+        assertEquals("foo", s0.tokens().get(1).toString());
+        assertEquals("1>", s0.redirections().get(0).toString());
+        assertEquals("bar", s0.redirections().get(1).toString());
+    }
+
     public void testSimpleValue()
     {
         Program p = new Parser(
-            "abc def.ghi http://www.osgi.org?abc=&x=1 [1,2,3] {{{{{{{xyz}}}}}}} (immediate) {'{{{{{'} {\\{} 'abc{}'")
+            "abc def.ghi http://www.osgi.org?abc=\\&x=1 [1,2,3] {{{{{{{xyz}}}}}}} (immediate) {'{{{{{'} {\\{} 'abc{}'")
             .program();
         List<Token> x = ((Statement) p.tokens().get(0)).tokens();
         assertEquals("abc", x.get(0).toString());
         assertEquals("def.ghi", x.get(1).toString());
-        assertEquals("http://www.osgi.org?abc=&x=1", x.get(2).toString());
+        assertEquals("http://www.osgi.org?abc=\\&x=1", x.get(2).toString());
         assertEquals("[1,2,3]", x.get(3).toString());
         assertEquals("{{{{{{{xyz}}}}}}}", x.get(4).toString());
         assertEquals("(immediate)", x.get(5).toString());
diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java
index c7f17f7..f7b818e 100644
--- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java
+++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java
@@ -18,6 +18,18 @@
  */
 package org.apache.felix.gogo.runtime;
 
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
@@ -25,20 +37,6 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-
 public class TestTokenizer
 {
     private final Map<String, Object> vars = new HashMap<String, Object>();
@@ -304,6 +302,16 @@
     {
         String script = "addcommand system (((${.context} bundles) 0) loadclass java.lang.System)";
 
+        PrintStream sout = new PrintStream(System.out) {
+            @Override
+            public void close() {
+            }
+        };
+        PrintStream serr = new PrintStream(System.err) {
+            @Override
+            public void close() {
+            }
+        };
         ThreadIOImpl tio = new ThreadIOImpl();
         tio.start();
 
@@ -315,7 +323,7 @@
             processor.addCommand("gogo", processor, "addcommand");
             processor.addConstant(".context", bc);
 
-            CommandSessionImpl session = new CommandSessionImpl(processor, new ByteArrayInputStream(script.getBytes()), System.out, System.err);
+            CommandSessionImpl session = new CommandSessionImpl(processor, new ByteArrayInputStream(script.getBytes()), sout, serr);
 
             Closure c = new Closure(session, null, script);
             assertNull(c.execute(session, null));