Add job control
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1736004 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java
index 7153dbf..91dcf17 100644
--- a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java
+++ b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Builtin.java
@@ -34,6 +34,7 @@
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
@@ -43,6 +44,7 @@
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.felix.gogo.runtime.Job;
import org.apache.felix.service.command.CommandSession;
import org.apache.felix.service.command.Converter;
import org.jline.builtins.Options;
@@ -52,7 +54,7 @@
*/
public class Builtin {
- static final String[] functions = {"format", "getopt", "new", "set", "tac", "type"};
+ static final String[] functions = {"format", "getopt", "new", "set", "tac", "type", "jobs", "fg", "bg"};
private static final String[] packages = {"java.lang", "java.io", "java.net",
"java.util"};
@@ -439,6 +441,63 @@
return false;
}
+ public void jobs(CommandSession session, String[] argv) {
+ List<Job> jobs = session.jobs();
+ Job current = session.currentJob();
+ for (Job job : jobs) {
+ if (job != current) {
+ System.out.println("[" + job.id() + "] " + job.status().toString().toLowerCase()
+ + " " + job.command());
+ }
+ }
+ }
+
+ public void fg(CommandSession session, String[] argv) {
+ List<Job> jobs = session.jobs();
+ Collections.reverse(jobs);
+ Job current = session.currentJob();
+ if (argv.length == 0) {
+ Job job = jobs.stream().filter(j -> j != current)
+ .findFirst().orElse(null);
+ if (job != null) {
+ job.foreground();
+ } else {
+ System.err.println("fg: no current job");
+ }
+ } else {
+ Job job = jobs.stream().filter(j -> j != current && argv[0].equals(Integer.toString(j.id())))
+ .findFirst().orElse(null);
+ if (job != null) {
+ job.foreground();
+ } else {
+ System.err.println("fg: job not found: " + argv[0]);
+ }
+ }
+ }
+
+ public void bg(CommandSession session, String[] argv) {
+ List<Job> jobs = session.jobs();
+ Collections.reverse(jobs);
+ Job current = session.currentJob();
+ if (argv.length == 0) {
+ Job job = jobs.stream().filter(j -> j != current)
+ .findFirst().orElse(null);
+ if (job != null) {
+ job.background();
+ } else {
+ System.err.println("fg: no current job");
+ }
+ } else {
+ Job job = jobs.stream().filter(j -> j != current && argv[0].equals(Integer.toString(j.id())))
+ .findFirst().orElse(null);
+ if (job != null) {
+ job.background();
+ } else {
+ System.err.println("fg: job not found: " + argv[0]);
+ }
+ }
+ }
+
private boolean isClosure(Object target) {
return target.getClass().getSimpleName().equals("Closure");
}
diff --git a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java
index d5a9a3c..740242d 100644
--- a/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java
+++ b/gogo/jline/src/main/java/org/apache/felix/gogo/jline/Shell.java
@@ -37,11 +37,14 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.gogo.runtime.Closure;
import org.apache.felix.gogo.runtime.CommandProxy;
import org.apache.felix.gogo.runtime.CommandSessionImpl;
import org.apache.felix.gogo.runtime.Expander;
+import org.apache.felix.gogo.runtime.Job;
+import org.apache.felix.gogo.runtime.Job.Status;
import org.apache.felix.gogo.runtime.Reflective;
import org.apache.felix.service.command.CommandProcessor;
import org.apache.felix.service.command.CommandSession;
@@ -56,8 +59,11 @@
import org.jline.reader.LineReaderBuilder;
import org.jline.reader.ParsedLine;
import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.LineReaderImpl;
import org.jline.reader.impl.history.history.FileHistory;
import org.jline.terminal.Terminal;
+import org.jline.terminal.Terminal.Signal;
+import org.jline.terminal.Terminal.SignalHandler;
public class Shell {
@@ -253,7 +259,7 @@
newSession.put("#LINES", (Function) (s, arguments) -> terminal.getHeight());
newSession.put("#PWD", (Function) (s, arguments) -> s.currentDir().toString());
- LineReader reader = null;
+ LineReader reader;
if (args.isEmpty() && interactive) {
reader = LineReaderBuilder.builder()
.terminal(terminal)
@@ -265,6 +271,8 @@
.build();
newSession.put(Shell.VAR_READER, reader);
newSession.put(Shell.VAR_COMPLETIONS, new HashMap());
+ } else {
+ reader = null;
}
if (login || interactive) {
@@ -285,34 +293,94 @@
if (args.isEmpty()) {
if (interactive) {
- while (true) {
- try {
- reader.readLine(Shell.getPrompt(session), Shell.getRPrompt(session), null, null);
- ParsedLine parsedLine = reader.getParsedLine();
- if (parsedLine == null) {
- throw new EndOfFileException();
+ AtomicBoolean reading = new AtomicBoolean();
+ newSession.setJobListener((job, previous, current) -> {
+ if (previous == Status.Background || current == Status.Background
+ || previous == Status.Suspended || current == Status.Suspended) {
+ int w = terminal.getWidth();
+ String status = current.name().toLowerCase();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < w - 1; i++) {
+ sb.append(' ');
}
- try {
- result = session.execute(((ParsedLineImpl) parsedLine).program());
- session.put(Shell.VAR_RESULT, result); // set $_ to last result
-
- if (result != null && !Boolean.FALSE.equals(session.get(".Gogo.format"))) {
- System.out.println(session.format(result, Converter.INSPECT));
- }
- } catch (Exception e) {
- session.put(Shell.VAR_EXCEPTION, e);
+ sb.append('\r');
+ sb.append("[").append(job.id()).append("] ");
+ sb.append(status);
+ for (int i = status.length(); i < "background".length(); i++) {
+ sb.append(' ');
}
-
- } catch (UserInterruptException e) {
- // continue;
- } catch (EndOfFileException e) {
- try {
- reader.getHistory().flush();
- } catch (IOException e1) {
- e.addSuppressed(e1);
+ sb.append(" ").append(job.command()).append("\n");
+ terminal.writer().write(sb.toString());
+ terminal.flush();
+ if (reading.get()) {
+ ((LineReaderImpl) reader).redrawLine();
+ ((LineReaderImpl) reader).redisplay();
}
- break;
}
+ });
+ SignalHandler intHandler = terminal.handle(Signal.INT, s -> {
+ Job current = newSession.foregroundJob();
+ if (current != null) {
+ current.interrupt();
+ }
+ });
+ SignalHandler suspHandler = terminal.handle(Signal.TSTP, s -> {
+ Job current = newSession.foregroundJob();
+ if (current != null) {
+ current.suspend();
+ }
+ });
+ try {
+ while (true) {
+ try {
+ reading.set(true);
+ try {
+ reader.readLine(Shell.getPrompt(session), Shell.getRPrompt(session), null, null);
+ } finally {
+ reading.set(false);
+ }
+ ParsedLine parsedLine = reader.getParsedLine();
+ if (parsedLine == null) {
+ throw new EndOfFileException();
+ }
+ try {
+ result = session.execute(((ParsedLineImpl) parsedLine).program());
+ session.put(Shell.VAR_RESULT, result); // set $_ to last result
+
+ if (result != null && !Boolean.FALSE.equals(session.get(".Gogo.format"))) {
+ System.out.println(session.format(result, Converter.INSPECT));
+ }
+ } catch (Exception e) {
+ session.put(Shell.VAR_EXCEPTION, e);
+ }
+
+ while (true) {
+ Job job = session.foregroundJob();
+ if (job != null) {
+ synchronized (job) {
+ if (job.status() == Status.Foreground) {
+ job.wait();
+ }
+ }
+ } else {
+ break;
+ }
+ }
+
+ } catch (UserInterruptException e) {
+ // continue;
+ } catch (EndOfFileException e) {
+ try {
+ reader.getHistory().flush();
+ } catch (IOException e1) {
+ e.addSuppressed(e1);
+ }
+ break;
+ }
+ }
+ } finally {
+ terminal.handle(Signal.INT, intHandler);
+ terminal.handle(Signal.TSTP, suspHandler);
}
}
} else {
@@ -341,7 +409,7 @@
program = readScript(script);
}
- result = newSession.execute(program);
+ result = newSession.execute(program);
}
if (login && interactive && !opt.isSet("noshutdown")) {
diff --git a/gogo/runtime/pom.xml b/gogo/runtime/pom.xml
index 90b83b2..1a407d4 100644
--- a/gogo/runtime/pom.xml
+++ b/gogo/runtime/pom.xml
@@ -29,6 +29,8 @@
<artifactId>org.apache.felix.gogo.runtime</artifactId>
<version>0.16.3-SNAPSHOT</version>
<properties>
+ <!-- Skip because of NPE -->
+ <animal.sniffer.skip>true</animal.sniffer.skip>
<felix.java.version>8</felix.java.version>
</properties>
<dependencies>
@@ -55,6 +57,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
+ <version>3.0.0</version>
<extensions>true</extensions>
<configuration>
<instructions>
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/api/JobListener.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/api/JobListener.java
new file mode 100644
index 0000000..179ebd5
--- /dev/null
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/api/JobListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.gogo.api;
+
+import org.apache.felix.gogo.runtime.Job;
+import org.apache.felix.gogo.runtime.Job.Status;
+import org.apache.felix.service.command.CommandSession;
+
+/**
+ * Listener for command executions.
+ *
+ * Such listeners must be registered in the OSGi registry and will be called
+ * by the CommandProcessor when a command line is executed in a given session.
+ */
+public interface JobListener {
+
+ void jobChanged(Job job, Status previous, Status current);
+
+}
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
index 2a63c11..2b121cc 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Closure.java
@@ -31,9 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import org.apache.felix.gogo.runtime.Job.Status;
import org.apache.felix.gogo.runtime.Parser.Array;
import org.apache.felix.gogo.runtime.Parser.Executable;
import org.apache.felix.gogo.runtime.Parser.Operator;
@@ -254,42 +253,23 @@
pipes.add(new Pipe(this, (Statement) executable, streams, toclose));
}
+ // Create job
+ Token s = pipes.get(0).statement;
+ Token e = pipes.get(pipes.size() - 1).statement;
+ Token t = program.subSequence(s.start - program.start, e.start + e.length - program.start);
+ Job job = session().createJob(t, pipes);
+
// Start pipe in background
if (operator != null && Token.eq("&", operator)) {
-
- for (Pipe pipe : pipes) {
- session().getExecutor().submit(pipe);
- }
-
+ job.start(Status.Background);
last = new Result((Object) null);
-
}
// Start in foreground and wait for results
else {
- List<Future<Result>> results = session().getExecutor().invokeAll(pipes);
-
- // Get pipe exceptions
- Exception pipeException = null;
- for (int i = 0; i < results.size() - 1; i++) {
- Future<Result> future = results.get(i);
- Throwable e;
- try {
- Result r = future.get();
- e = r.exception;
- } catch (ExecutionException ee) {
- e = ee.getCause();
- }
- if (e != null) {
- if (pipeException == null) {
- pipeException = new Exception("Exception caught during pipe execution");
- }
- pipeException.addSuppressed(e);
- }
- }
- session.put(PIPE_EXCEPTION, pipeException);
-
- last = results.get(results.size() - 1).get();
- if (last.exception != null) {
+ last = job.start(Status.Foreground);
+ if (last == null) {
+ last = new Result((Object) null);
+ } else if (last.exception != null) {
throw last.exception;
}
}
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java
index 8f07108..3243429 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandProcessorImpl.java
@@ -21,8 +21,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -73,20 +71,6 @@
}
}
- public CommandSessionImpl createSession(ReadableByteChannel in, WritableByteChannel out, WritableByteChannel err)
- {
- synchronized (sessions)
- {
- if (stopped)
- {
- throw new IllegalStateException("CommandProcessor has been stopped");
- }
- CommandSessionImpl session = new CommandSessionImpl(this, in, out, err);
- sessions.put(session, null);
- return session;
- }
- }
-
public CommandSessionImpl createSession(InputStream in, OutputStream out, OutputStream err)
{
synchronized (sessions)
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
index f914a23..cdffab7 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java
@@ -32,6 +32,7 @@
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -39,12 +40,20 @@
import java.util.Enumeration;
import java.util.Formatter;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.felix.gogo.api.JobListener;
+import org.apache.felix.gogo.runtime.Job.Status;
+import org.apache.felix.gogo.runtime.Pipe.Result;
import org.apache.felix.service.command.CommandProcessor;
import org.apache.felix.service.command.CommandSession;
import org.apache.felix.service.command.Converter;
@@ -70,6 +79,9 @@
private final CommandProcessorImpl processor;
protected final ConcurrentMap<String, Object> variables = new ConcurrentHashMap<>();
private volatile boolean closed;
+ private final List<JobImpl> jobs = new ArrayList<>();
+ private final ThreadLocal<JobImpl> currentJob = new InheritableThreadLocal<>();
+ private JobListener jobListener;
private final ExecutorService executor;
@@ -88,19 +100,6 @@
this.perr = parent.perr;
}
- protected CommandSessionImpl(CommandProcessorImpl shell, ReadableByteChannel in, WritableByteChannel out, WritableByteChannel err)
- {
- this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize();
- this.executor = Executors.newCachedThreadPool();
- this.processor = shell;
- this.channels = new Channel[] { in, out, err };
- this.in = Channels.newInputStream(in);
- this.out = Channels.newOutputStream(out);
- this.err = out == err ? this.out : Channels.newOutputStream(err);
- this.pout = new PrintStream(this.out, true);
- this.perr = out == err ? pout : new PrintStream(this.err, true);
- }
-
protected CommandSessionImpl(CommandProcessorImpl shell, InputStream in, OutputStream out, OutputStream err)
{
this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize();
@@ -150,10 +149,6 @@
}
}
- ExecutorService getExecutor() {
- return executor;
- }
-
public Object execute(CharSequence commandline) throws Exception
{
assert processor != null;
@@ -370,7 +365,7 @@
}
if (target instanceof Dictionary)
{
- Map<Object, Object> result = new HashMap<Object, Object>();
+ Map<Object, Object> result = new HashMap<>();
for (Enumeration e = ((Dictionary) target).keys(); e.hasMoreElements();)
{
Object key = e.nextElement();
@@ -496,4 +491,255 @@
return processor.expr(this, expr);
}
+ @Override
+ public List<Job> jobs() {
+ synchronized (jobs) {
+ return new ArrayList<>(jobs);
+ }
+ }
+
+ @Override
+ public JobImpl currentJob() {
+ JobImpl job = currentJob.get();
+ while (job != null && job.parent != null) {
+ job = job.parent;
+ }
+ return job;
+ }
+
+ @Override
+ public JobImpl foregroundJob() {
+ List<JobImpl> jobs;
+ synchronized (this.jobs) {
+ jobs = new ArrayList<>(this.jobs);
+ }
+ return jobs.stream()
+ .filter(j -> j.parent == null && j.status() == Status.Foreground)
+ .findFirst()
+ .orElse(null);
+ }
+
+ @Override
+ public void setJobListener(JobListener listener) {
+ synchronized (jobs) {
+ jobListener = listener;
+ }
+ }
+
+ public Job createJob(CharSequence command, List<Pipe> pipes) {
+ synchronized (jobs) {
+ int id = 1;
+ synchronized (jobs) {
+ boolean found;
+ do {
+ found = false;
+ for (Job job : jobs) {
+ if (job.id() == id) {
+ found = true;
+ id++;
+ break;
+ }
+ }
+ } while (found);
+ }
+ JobImpl cur = currentJob();
+ JobImpl job = new JobImpl(id, cur, command, pipes);
+ if (cur == null) {
+ jobs.add(job);
+ }
+ return job;
+ }
+ }
+
+ private class JobImpl implements Job {
+ private final int id;
+ private final JobImpl parent;
+ private final CharSequence command;
+ private final List<Pipe> pipes;
+ private Status status = Status.Created;
+ private Future<?> future;
+ private Result result;
+
+ public JobImpl(int id, JobImpl parent, CharSequence command, List<Pipe> pipes) {
+ this.id = id;
+ this.parent = parent;
+ this.command = command;
+ this.pipes = pipes;
+ }
+
+ @Override
+ public int id() {
+ return id;
+ }
+
+ public CharSequence command() {
+ return command;
+ }
+
+ @Override
+ public synchronized Status status() {
+ return status;
+ }
+
+ @Override
+ public synchronized void suspend() {
+ if (status == Status.Done) {
+ throw new IllegalStateException("Job is finished");
+ }
+ if (status != Status.Suspended) {
+ setStatus(Status.Suspended);
+ }
+ }
+
+ @Override
+ public synchronized void background() {
+ if (status == Status.Done) {
+ throw new IllegalStateException("Job is finished");
+ }
+ if (status != Status.Background) {
+ setStatus(Status.Background);
+ }
+ }
+
+ @Override
+ public synchronized void foreground() {
+ if (status == Status.Done) {
+ throw new IllegalStateException("Job is finished");
+ }
+ JobImpl cr = currentJob();
+ JobImpl fg = foregroundJob();
+ if (parent == null && fg != null && fg != this && fg != cr) {
+ throw new IllegalStateException("A job is already in foreground");
+ }
+ if (status != Status.Foreground) {
+ setStatus(Status.Foreground);
+ }
+ }
+
+ @Override
+ public void interrupt() {
+ Future future;
+ synchronized (this) {
+ future = this.future;
+ }
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+
+ protected synchronized void done() {
+ if (status == Status.Done) {
+ throw new IllegalStateException("Job is finished");
+ }
+ setStatus(Status.Done);
+ }
+
+ private void setStatus(Status newStatus) {
+ setStatus(newStatus, true);
+ }
+
+ private void setStatus(Status newStatus, boolean callListeners) {
+ Status previous;
+ synchronized (this) {
+ previous = this.status;
+ status = newStatus;
+ }
+ if (callListeners) {
+ JobListener listener;
+ synchronized (jobs) {
+ listener = jobListener;
+ if (newStatus == Status.Done) {
+ jobs.remove(this);
+ }
+ }
+ if (listener != null) {
+ listener.jobChanged(this, previous, newStatus);
+ }
+ }
+ synchronized (this) {
+ JobImpl.this.notifyAll();
+ }
+ }
+
+ @Override
+ public synchronized Result result() {
+ return result;
+ }
+
+ @Override
+ public synchronized Result start(Status status) throws InterruptedException {
+ if (status == Status.Created || status == Status.Done) {
+ throw new IllegalArgumentException("Illegal start status");
+ }
+ if (this.status != Status.Created) {
+ throw new IllegalStateException("Job already started");
+ }
+ switch (status) {
+ case Suspended:
+ suspend();
+ break;
+ case Background:
+ background();
+ break;
+ case Foreground:
+ foreground();
+ break;
+ }
+ future = executor.submit(this::call);
+ while (this.status == Status.Foreground) {
+ JobImpl.this.wait();
+ }
+ return result;
+ }
+
+ private Void call() throws Exception {
+ Thread thread = Thread.currentThread();
+ String name = thread.getName();
+ try {
+ thread.setName("job controller " + id);
+
+ List<Callable<Result>> wrapped = pipes.stream().map(this::wrap).collect(Collectors.toList());
+ List<Future<Result>> results = executor.invokeAll(wrapped);
+
+ // Get pipe exceptions
+ Exception pipeException = null;
+ for (int i = 0; i < results.size() - 1; i++) {
+ Future<Result> future = results.get(i);
+ Throwable e;
+ try {
+ Result r = future.get();
+ e = r.exception;
+ } catch (ExecutionException ee) {
+ e = ee.getCause();
+ }
+ if (e != null) {
+ if (pipeException == null) {
+ pipeException = new Exception("Exception caught during pipe execution");
+ }
+ pipeException.addSuppressed(e);
+ }
+ }
+ put(Closure.PIPE_EXCEPTION, pipeException);
+
+ result = results.get(results.size() - 1).get();
+ } finally {
+ done();
+ thread.setName(name);
+ }
+ return null;
+ }
+
+ private Callable<Result> wrap(Pipe pipe) {
+ return () -> {
+ JobImpl prevJob = currentJob.get();
+ try {
+ currentJob.set(this);
+ return pipe.call();
+ } finally {
+ currentJob.set(prevJob);
+ }
+ };
+ }
+
+ }
}
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Job.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Job.java
new file mode 100644
index 0000000..721fa8c
--- /dev/null
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Job.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.gogo.runtime;
+
+import org.apache.felix.gogo.runtime.Pipe.Result;
+
+public interface Job {
+
+ enum Status {
+ Created,
+ Suspended,
+ Background,
+ Foreground,
+ Done
+ }
+
+ int id();
+
+ CharSequence command();
+
+ Status status();
+
+ void suspend();
+
+ void background();
+
+ void foreground();
+
+ void interrupt();
+
+ Result result();
+
+ /**
+ * Start the job.
+ * If the job is started in foreground,
+ * waits for the job to finish or to be
+ * suspended or moved to background.
+ *
+ * @param status the desired job status
+ * @return <code>null</code> if the job
+ * has been suspended or moved to background,
+ *
+ */
+ Result start(Status status) throws InterruptedException;
+
+}
diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
index e92b329..fac281f 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
@@ -20,8 +20,10 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.InterruptedIOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@@ -38,6 +40,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.felix.gogo.runtime.Job.Status;
import org.apache.felix.gogo.runtime.Parser.Statement;
import org.apache.felix.gogo.runtime.Pipe.Result;
import org.apache.felix.service.command.Converter;
@@ -114,6 +117,9 @@
private static final int WRITE = 2;
private void setStream(Channel ch, int fd, int readWrite, boolean begOfPipe, boolean endOfPipe) throws IOException {
+ if ((readWrite & (READ | WRITE)) == 0) {
+ throw new IllegalArgumentException("Should specify READ and/or WRITE");
+ }
if ((readWrite & READ) != 0 && !(ch instanceof ReadableByteChannel)) {
throw new IllegalArgumentException("Channel is not readable");
}
@@ -135,43 +141,24 @@
if (streams[fd] != null && (readWrite & READ) != 0 && (readWrite & WRITE) != 0) {
throw new IllegalArgumentException("Can not do multios with read/write streams");
}
- if ((readWrite & READ) != 0) {
- MultiReadableByteChannel mrbc;
- if (streams[fd] instanceof MultiReadableByteChannel) {
- mrbc = (MultiReadableByteChannel) streams[fd];
- } else {
- mrbc = new MultiReadableByteChannel();
- if (streams[fd] != null && begOfPipe) {
- if (toclose[fd]) {
- streams[fd].close();
- }
- } else {
- mrbc.addChannel((ReadableByteChannel) streams[fd], toclose[fd]);
- }
- streams[fd] = mrbc;
- toclose[fd] = true;
- }
- mrbc.addChannel((ReadableByteChannel) ch, true);
- } else if ((readWrite & WRITE) != 0) {
- MultiWritableByteChannel mrbc;
- if (streams[fd] instanceof MultiWritableByteChannel) {
- mrbc = (MultiWritableByteChannel) streams[fd];
- } else {
- mrbc = new MultiWritableByteChannel();
- if (streams[fd] != null && endOfPipe) {
- if (toclose[fd]) {
- streams[fd].close();
- }
- } else {
- mrbc.addChannel((WritableByteChannel) streams[fd], toclose[fd]);
- }
- streams[fd] = mrbc;
- toclose[fd] = true;
- }
- mrbc.addChannel((WritableByteChannel) ch, true);
+ MultiChannel mrbc;
+ if (streams[fd] instanceof MultiChannel) {
+ mrbc = (MultiChannel) streams[fd];
} else {
- throw new IllegalStateException();
+ mrbc = new MultiChannel();
+ if (streams[fd] != null
+ && ((begOfPipe && (readWrite & READ) != 0)
+ || (endOfPipe && (readWrite & WRITE) != 0))) {
+ if (toclose[fd]) {
+ streams[fd].close();
+ }
+ } else {
+ mrbc.addChannel(streams[fd], toclose[fd]);
+ }
+ streams[fd] = mrbc;
+ toclose[fd] = true;
}
+ mrbc.addChannel(ch, true);
}
else {
if (streams[fd] != null && toclose[fd]) {
@@ -182,59 +169,6 @@
}
}
- private static class MultiChannel<T extends Channel> implements Channel {
- protected final List<T> channels = new ArrayList<>();
- protected final List<T> toClose = new ArrayList<>();
- protected final AtomicBoolean opened = new AtomicBoolean(true);
- public void addChannel(T channel, boolean toclose) {
- channels.add(channel);
- if (toclose) {
- toClose.add(channel);
- }
- }
-
- public boolean isOpen() {
- return opened.get();
- }
-
- public void close() throws IOException {
- if (opened.compareAndSet(true, false)) {
- for (T channel : toClose) {
- channel.close();
- }
- }
- }
- }
-
- private static class MultiReadableByteChannel extends MultiChannel<ReadableByteChannel> implements ReadableByteChannel {
- int index = 0;
- public int read(ByteBuffer dst) throws IOException {
- int nbRead = -1;
- while (nbRead < 0 && index < channels.size()) {
- nbRead = channels.get(index).read(dst);
- if (nbRead < 0) {
- index++;
- } else {
- break;
- }
- }
- return nbRead;
- }
- }
-
- private static class MultiWritableByteChannel extends MultiChannel<WritableByteChannel> implements WritableByteChannel {
- public int write(ByteBuffer src) throws IOException {
- int pos = src.position();
- for (WritableByteChannel ch : channels) {
- src.position(pos);
- while (src.hasRemaining()) {
- ch.write(src);
- }
- }
- return src.position() - pos;
- }
- }
-
@Override
public Result call() throws Exception {
Thread thread = Thread.currentThread();
@@ -247,7 +181,7 @@
}
}
- public Result doCall()
+ private Result doCall()
{
InputStream in;
PrintStream out = null;
@@ -332,6 +266,10 @@
}
}
+ for (int i = 0; i < streams.length; i++) {
+ streams[i] = wrap(streams[i]);
+ }
+
// Create streams
in = Channels.newInputStream((ReadableByteChannel) streams[0]);
out = new PrintStream(Channels.newOutputStream((WritableByteChannel) streams[1]), true);
@@ -396,4 +334,96 @@
}
}
}
+
+ private Channel wrap(Channel channel) {
+ if (channel == null) {
+ return null;
+ }
+ if (channel instanceof MultiChannel) {
+ return channel;
+ }
+ MultiChannel mch = new MultiChannel();
+ mch.addChannel(channel, true);
+ return mch;
+ }
+
+ private class MultiChannel implements ByteChannel {
+ protected final List<Channel> channels = new ArrayList<>();
+ protected final List<Channel> toClose = new ArrayList<>();
+ protected final AtomicBoolean opened = new AtomicBoolean(true);
+ int index = 0;
+
+ public void addChannel(Channel channel, boolean toclose) {
+ channels.add(channel);
+ if (toclose) {
+ toClose.add(channel);
+ }
+ }
+
+ public boolean isOpen() {
+ return opened.get();
+ }
+
+ public void close() throws IOException {
+ if (opened.compareAndSet(true, false)) {
+ for (Channel channel : toClose) {
+ channel.close();
+ }
+ }
+ }
+
+ public int read(ByteBuffer dst) throws IOException {
+ int nbRead = -1;
+ while (nbRead < 0 && index < channels.size()) {
+ Channel ch = channels.get(index);
+ checkSuspend(ch);
+ nbRead = ((ReadableByteChannel) ch).read(dst);
+ if (nbRead < 0) {
+ index++;
+ } else {
+ break;
+ }
+ }
+ return nbRead;
+ }
+
+ public int write(ByteBuffer src) throws IOException {
+ int pos = src.position();
+ for (Channel ch : channels) {
+ checkSuspend(ch);
+ src.position(pos);
+ while (src.hasRemaining()) {
+ ((WritableByteChannel) ch).write(src);
+ }
+ }
+ return src.position() - pos;
+ }
+
+ private void checkSuspend(Channel ch) throws IOException {
+ Job cur = closure.session().currentJob();
+ if (cur != null) {
+ Channel[] sch = closure.session().channels;
+ if (ch == sch[0] || ch == sch[1] || ch == sch[2]) {
+ synchronized (cur) {
+ if (cur.status() == Status.Background) {
+ // TODO: Send SIGTIN / SIGTOU
+ cur.suspend();
+ }
+ }
+ }
+ synchronized (cur) {
+ while (cur.status() == Status.Suspended) {
+ try {
+ cur.wait();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ }
+ }
+ }
+ } else {
+ String msg = "This is definitely not expected";
+ }
+ }
+ }
+
}
diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
index 58dbd22..71e5ddc 100644
--- a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
+++ b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java
@@ -21,6 +21,10 @@
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.felix.gogo.api.JobListener;
+import org.apache.felix.gogo.runtime.Job;
public interface CommandSession
{
@@ -98,4 +102,13 @@
*/
Object convert(Class<?> type, Object instance);
+
+ List<Job> jobs();
+
+ Job currentJob();
+
+ Job foregroundJob();
+
+ void setJobListener(JobListener listener);
+
}