Refactored remote shell to improve service handling and concurrency handling.
It probably still could be refactored even more, but I don't have the time.
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@768564 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java b/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java
index a25cd81..d4ef917 100644
--- a/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java
+++ b/shell.remote/src/main/java/org/apache/felix/shell/remote/Activator.java
@@ -24,41 +24,29 @@
*/
public class Activator implements BundleActivator
{
- private static ServiceMediator c_services;
+ private ServiceMediator m_services;
private Listener m_listener;
- public void start(BundleContext bundleContext) throws Exception
+ public void start(BundleContext context) throws Exception
{
//1. Prepare mediator
- c_services = new ServiceMediator();
- c_services.activate(bundleContext);
+ m_services = new ServiceMediator(context);
//2. Prepare the listener
- m_listener = new Listener();
- m_listener.activate(bundleContext);
+ m_listener = new Listener(context, m_services);
}
- public void stop(BundleContext bundleContext) throws Exception
+ public void stop(BundleContext context) throws Exception
{
if (m_listener != null)
{
m_listener.deactivate();
m_listener = null;
}
- if (c_services != null)
+ if (m_services != null)
{
- c_services.deactivate();
- c_services = null;
+ m_services.deactivate();
+ m_services = null;
}
}
-
- /**
- * Returns a reference to the {@link ServiceMediator} instance used in this bundle.
- *
- * @return a {@link ServiceMediator} instance.
- */
- static ServiceMediator getServices()
- {
- return c_services;
- }
}
\ No newline at end of file
diff --git a/shell.remote/src/main/java/org/apache/felix/shell/remote/Latch.java b/shell.remote/src/main/java/org/apache/felix/shell/remote/Latch.java
deleted file mode 100644
index e39c41b..0000000
--- a/shell.remote/src/main/java/org/apache/felix/shell/remote/Latch.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.felix.shell.remote;
-
-/**
- * A latch is a boolean condition that is set at most once, ever.
- * Once a single release is issued, all acquires will pass.
- * <p/>
- * <b>Sample usage.</b> Here are a set of classes that use
- * a latch as a start signal for a group of worker threads that
- * are created and started beforehand, and then later enabled.
- * <pre>
- * class Worker implements Runnable {
- * private final Latch startSignal;
- * Worker(Latch l) { startSignal = l; }
- * public void run() {
- * startSignal.acquire();
- * doWork();
- * }
- * void doWork() { ... }
- * }
- * <p/>
- * class Driver { // ...
- * void main() {
- * Latch go = new Latch();
- * for (int i = 0; i < N; ++i) // make threads
- * new Thread(new Worker(go)).start();
- * doSomethingElse(); // don't let run yet
- * go.release(); // let all threads proceed
- * }
- * }
- * </pre>
- */
-class Latch
-{
- private boolean m_latched = false;
-
- /*
- This could use double-check, but doesn't.
- If the latch is being used as an indicator of
- the presence or state of an object, the user would
- not necessarily get the memory barrier that comes with synch
- that would be needed to correctly use that object. This
- would lead to errors that users would be very hard to track down. So, to
- be conservative, we always use synch.
- */
- public void acquire() throws InterruptedException
- {
- if (Thread.interrupted())
- {
- throw new InterruptedException();
- }
- synchronized (this)
- {
- while (!m_latched)
- {
- wait();
- }
- }
- }//acquire
-
- public boolean attempt(long msecs) throws InterruptedException
- {
- if (Thread.interrupted())
- {
- throw new InterruptedException();
- }
- synchronized (this)
- {
- if (m_latched)
- {
- return true;
- }
- else if (msecs <= 0)
- {
- return false;
- }
- else
- {
- long waitTime = msecs;
- long start = System.currentTimeMillis();
- for (;;)
- {
- wait(waitTime);
- if (m_latched)
- {
- return true;
- }
- else
- {
- waitTime = msecs - (System.currentTimeMillis() - start);
- if (waitTime <= 0)
- {
- return false;
- }
- }
- }
- }
- }
- }//attempt
-
- /**
- * Enable all current and future acquires to pass *
- */
- public synchronized void release()
- {
- m_latched = true;
- notifyAll();
- }//release
-}//class Latch
\ No newline at end of file
diff --git a/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java b/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java
index 983204d..8aba389 100644
--- a/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java
+++ b/shell.remote/src/main/java/org/apache/felix/shell/remote/Listener.java
@@ -33,31 +33,37 @@
*/
class Listener
{
- private int m_port;
- private String m_ip;
- private Thread m_listenerThread;
- private boolean m_stop = false;
- private ServerSocket m_serverSocket;
- private AtomicInteger m_useCounter;
- private int m_maxConnections;
- private int m_soTimeout;
- private Set m_connections;
+ private final int m_port;
+ private final String m_ip;
+ private final Thread m_listenerThread;
+ private final Acceptor m_acceptor;
+ private final AtomicInteger m_useCounter;
+ private final int m_maxConnections;
+ private final int m_soTimeout;
+ private final Set m_connections;
+ private final ServiceMediator m_services;
/**
* Activates this listener on a listener thread (telnetconsole.Listener).
*/
- public void activate(BundleContext bundleContext)
+ public Listener(BundleContext context, ServiceMediator services) throws IOException
{
+ m_services = services;
//configure from framework property
- m_ip = getProperty(bundleContext, "osgi.shell.telnet.ip", "127.0.0.1");
- m_port = getProperty(bundleContext, "osgi.shell.telnet.port", 6666);
- m_soTimeout = getProperty(bundleContext, "osgi.shell.telnet.socketTimeout", 0);
- m_maxConnections = getProperty(bundleContext, "osgi.shell.telnet.maxconn", 2);
+ m_ip = getProperty(context, "osgi.shell.telnet.ip", "127.0.0.1");
+ m_port = getProperty(context, "osgi.shell.telnet.port", 6666);
+ m_soTimeout = getProperty(context, "osgi.shell.telnet.socketTimeout", 0);
+ m_maxConnections = getProperty(context, "osgi.shell.telnet.maxconn", 2);
m_useCounter = new AtomicInteger(0);
m_connections = new HashSet();
- m_listenerThread = new Thread(new Acceptor(), "telnetconsole.Listener");
+ m_acceptor = new Acceptor();
+ m_listenerThread = new Thread(m_acceptor, "telnetconsole.Listener");
m_listenerThread.start();
}//activate
+ public ServiceMediator getServices()
+ {
+ return m_services;
+ }
/**
* Deactivates this listener.
@@ -70,14 +76,13 @@
{
try
{
- m_stop = true;
//wait for the listener thread
- m_serverSocket.close();
+ m_acceptor.close();
m_listenerThread.join();
}
catch (Exception ex)
{
- Activator.getServices().error("Listener::deactivate()", ex);
+ m_services.error("Listener::deactivate()", ex);
}
// get the active connections (and clear the list)
@@ -103,6 +108,21 @@
*/
private class Acceptor implements Runnable
{
+ private volatile boolean m_stop = false;
+ private final ServerSocket m_serverSocket;
+
+ Acceptor() throws IOException
+ {
+ m_serverSocket = new ServerSocket(m_port, 1, InetAddress.getByName(m_ip));
+ m_serverSocket.setSoTimeout(m_soTimeout);
+ }
+
+ public void close() throws IOException
+ {
+ m_stop = true;
+ m_serverSocket.close();
+ }
+
/**
* Listens constantly to a server socket and handles incoming connections.
* One connection will be accepted and routed into the shell, all others will
@@ -122,8 +142,6 @@
should be handled properly, but denial of service attacks via massive parallel
program logins should be prevented with this.
*/
- m_serverSocket = new ServerSocket(m_port, 1, InetAddress.getByName(m_ip));
- m_serverSocket.setSoTimeout(m_soTimeout);
do
{
try
@@ -161,7 +179,7 @@
}
catch (IOException e)
{
- Activator.getServices().error("Listener.Acceptor::activate()", e);
+ m_services.error("Listener.Acceptor::run()", e);
}
}//run
}//inner class Acceptor
@@ -179,7 +197,7 @@
}
catch (NumberFormatException ex)
{
- Activator.getServices().error("Listener::activate()", ex);
+ m_services.error("Listener::activate()", ex);
}
}
@@ -233,4 +251,4 @@
m_connections.remove(connection);
}
}
-}//class Listener
+}//class Listener
\ No newline at end of file
diff --git a/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java b/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java
index 7f7d73d..994a4ff 100644
--- a/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java
+++ b/shell.remote/src/main/java/org/apache/felix/shell/remote/ServiceMediator.java
@@ -25,17 +25,42 @@
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.osgi.service.log.LogService;
+import org.osgi.util.tracker.ServiceTracker;
/**
* Implements a mediator pattern class for services from the OSGi container.
*/
class ServiceMediator
{
- private BundleContext m_bundleContext;
- private ShellService m_felixShellService;
- private Latch m_felixShellServiceLatch;
- private LogService m_logService;
- private Latch m_logServiceLatch;
+ private final String m_bundleName;
+ private final long m_bundleId;
+ private final BundleContext m_context;
+ private final ServiceTracker m_logTracker;
+ private final ServiceTracker m_shellTracker;
+
+ ServiceMediator(BundleContext context)
+ {
+ m_context = context;
+ m_bundleName = (m_context.getBundle().getSymbolicName() == null)
+ ? m_context.getBundle().getLocation()
+ : m_context.getBundle().getSymbolicName();
+ m_bundleId = m_context.getBundle().getBundleId();
+ ServiceTracker logTracker = null;
+ try
+ {
+ logTracker = new ServiceTracker(m_context, LogService.class.getName(), null);
+ logTracker.open();
+ }
+ catch (Throwable ex)
+ {
+ // This means we don't have access to the log service package since it
+ // is optional, so don't track log services.
+ logTracker = null;
+ }
+ m_logTracker = logTracker;
+ m_shellTracker = new ServiceTracker(m_context, ShellService.class.getName(), null);
+ m_shellTracker.open();
+ }
/**
* Returns a reference to the <tt>ShellService</tt> (Felix).
@@ -45,15 +70,16 @@
*/
public ShellService getFelixShellService(long wait)
{
+ ShellService shell = null;
try
{
if (wait < 0)
{
- m_felixShellServiceLatch.acquire();
+ shell = (ShellService) m_shellTracker.getService();
}
- else if (wait > 0)
+ else
{
- m_felixShellServiceLatch.attempt(wait);
+ shell = (ShellService) m_shellTracker.waitForService(wait);
}
}
catch (InterruptedException e)
@@ -61,34 +87,39 @@
e.printStackTrace(System.err);
}
- return m_felixShellService;
+ return shell;
}//getFelixShellService
- public LogService getLogServiceLatch(long wait)
+ public Object getLogServiceLatch(long wait)
{
- try
+ Object log = null;
+ if (m_logTracker != null)
{
- if (wait < 0)
+ try
{
- m_logServiceLatch.acquire();
+ if (wait < 0)
+ {
+ log = m_logTracker.getService();
+ }
+ else
+ {
+ log = m_logTracker.waitForService(wait);
+ }
}
- else if (wait > 0)
+ catch (InterruptedException e)
{
- m_logServiceLatch.attempt(wait);
+ e.printStackTrace(System.err);
}
}
- catch (InterruptedException e)
- {
- e.printStackTrace(System.err);
- }
- return m_logService;
+ return log;
}//getLogService
public void info(String msg)
{
- if (m_logService != null)
+ Object log = getLogServiceLatch(NO_WAIT);
+ if (log != null)
{
- m_logService.log(LogService.LOG_INFO, msg);
+ ((LogService) log).log(LogService.LOG_INFO, msg);
}
else
{
@@ -98,9 +129,10 @@
public void error(String msg, Throwable t)
{
- if (m_logService != null)
+ Object log = getLogServiceLatch(NO_WAIT);
+ if (log != null)
{
- m_logService.log(LogService.LOG_ERROR, msg, t);
+ ((LogService) log).log(LogService.LOG_ERROR, msg);
}
else
{
@@ -110,9 +142,10 @@
public void error(String msg)
{
- if (m_logService != null)
+ Object log = getLogServiceLatch(NO_WAIT);
+ if (log != null)
{
- m_logService.log(LogService.LOG_ERROR, msg);
+ ((LogService) log).log(LogService.LOG_ERROR, msg);
}
else
{
@@ -122,9 +155,10 @@
public void debug(String msg)
{
- if (m_logService != null)
+ Object log = getLogServiceLatch(NO_WAIT);
+ if (log != null)
{
- m_logService.log(LogService.LOG_DEBUG, msg);
+ ((LogService) log).log(LogService.LOG_DEBUG, msg);
}
else
{
@@ -134,9 +168,10 @@
public void warn(String msg)
{
- if (m_logService != null)
+ Object log = getLogServiceLatch(NO_WAIT);
+ if (log != null)
{
- m_logService.log(LogService.LOG_WARNING, msg);
+ ((LogService) log).log(LogService.LOG_WARNING, msg);
}
else
{
@@ -148,10 +183,9 @@
{
//Assemble String
StringBuffer sbuf = new StringBuffer();
- Bundle b = m_bundleContext.getBundle();
- sbuf.append(b.getHeaders().get(Constants.BUNDLE_NAME));
+ sbuf.append(m_bundleName);
sbuf.append(" [");
- sbuf.append(b.getBundleId());
+ sbuf.append(m_bundleId);
sbuf.append("] ");
sbuf.append(msg);
System.out.println(sbuf.toString());
@@ -161,10 +195,9 @@
{
//Assemble String
StringBuffer sbuf = new StringBuffer();
- Bundle b = m_bundleContext.getBundle();
- sbuf.append(b.getHeaders().get(Constants.BUNDLE_NAME));
+ sbuf.append(m_bundleName);
sbuf.append(" [");
- sbuf.append(b.getBundleId());
+ sbuf.append(m_bundleId);
sbuf.append("] ");
sbuf.append(msg);
System.err.println(sbuf.toString());
@@ -175,136 +208,19 @@
}//logToSystem
/**
- * Activates this mediator to start tracking the required services using the
- * OSGi service layer.
- *
- * @param bc the bundle's context.
- * @return true if activated successfully, false otherwise.
- */
- public boolean activate(BundleContext bc)
- {
- //get the context
- m_bundleContext = bc;
-
- m_felixShellServiceLatch = createWaitLatch();
- m_logServiceLatch = createWaitLatch();
-
- //prepareDefinitions listener
- ServiceListener serviceListener = new ServiceListenerImpl();
-
- //prepareDefinitions the filter, ShellService is required,
- //LogService may be missing, in which case we only use the
- // ShellService part of the filter
- String filter = "(objectclass=" + ShellService.class.getName() + ")";
- try
- {
- filter = "(|" + filter + "(objectclass=" + LogService.class.getName() + "))";
- }
- catch (Throwable t)
- {
- // ignore
- }
-
- try
- {
- //add the listener to the bundle context.
- bc.addServiceListener(serviceListener, filter);
-
- //ensure that already registered Service instances are registered with
- //the manager
- ServiceReference[] srl = bc.getServiceReferences(null, filter);
- for (int i = 0; srl != null && i < srl.length; i++)
- {
- serviceListener.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, srl[i]));
- }
- }
- catch (InvalidSyntaxException ex)
- {
- ex.printStackTrace(System.err);
- return false;
- }
- return true;
- }//activate
-
- /**
* Deactivates this mediator, nulling out all references.
* If called when the bundle is stopped, the framework should actually take
* care of unregistering the <tt>ServiceListener</tt>.
*/
public void deactivate()
{
- m_felixShellService = null;
- m_felixShellServiceLatch = null;
- m_bundleContext = null;
+ if (m_logTracker != null)
+ {
+ m_logTracker.close();
+ }
+ m_shellTracker.close();
}//deactivate
- /**
- * Creates a simple wait latch to be used for the mechanism that allows entities
- * in the bundles to wait for references.
- *
- * @return a new Latch instance.
- */
- private Latch createWaitLatch()
- {
- return new Latch();
- }//createWaitLatch
-
- /**
- * The <tt>ServiceListener</tt> implementation.
- */
- private class ServiceListenerImpl implements ServiceListener
- {
- public void serviceChanged(ServiceEvent ev)
- {
- ServiceReference sr = ev.getServiceReference();
- Object o = null;
- switch (ev.getType())
- {
- case ServiceEvent.REGISTERED:
- o = m_bundleContext.getService(sr);
- if (o == null)
- {
- return;
- }
- else if (o instanceof ShellService)
- {
- m_felixShellService = (ShellService) o;
- m_felixShellServiceLatch.release();
- }
- else if (o instanceof LogService)
- {
- m_logService = (LogService) o;
- m_logServiceLatch.release();
- }
- else
- {
- m_bundleContext.ungetService(sr);
- }
- break;
- case ServiceEvent.UNREGISTERING:
- o = m_bundleContext.getService(sr);
- if (o == null)
- {
- return;
- }
- else if (o instanceof ShellService)
- {
- m_felixShellService = null;
- m_felixShellServiceLatch = createWaitLatch();
- }
- else if (o instanceof LogService)
- {
- m_logService = null;
- m_logServiceLatch = createWaitLatch();
- }
- else
- {
- m_bundleContext.ungetService(sr);
- }
- break;
- }
- }
- }//inner class ServiceListenerImpl
- public static long WAIT_UNLIMITED = -1;
- public static long NO_WAIT = 0;
-}//class ServiceMediator
+ public static long WAIT_UNLIMITED = 0;
+ public static long NO_WAIT = -1;
+}//class ServiceMediator
\ No newline at end of file
diff --git a/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java b/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java
index 382cb21..6530aa3 100644
--- a/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java
+++ b/shell.remote/src/main/java/org/apache/felix/shell/remote/Shell.java
@@ -37,9 +37,10 @@
*/
class Shell implements Runnable
{
- private Listener m_owner;
- private Socket m_socket;
- private AtomicInteger m_useCounter;
+ private final Listener m_owner;
+ private final Socket m_socket;
+ private final AtomicInteger m_useCounter;
+ private volatile TerminalPrintStream m_out;
public Shell(Listener owner, Socket s, AtomicInteger counter)
{
@@ -63,27 +64,29 @@
try
{
- PrintStream out = new TerminalPrintStream(m_socket.getOutputStream());
- BufferedReader in = new BufferedReader(new TerminalReader(m_socket.getInputStream(), out));
+ m_out = new TerminalPrintStream(
+ m_owner.getServices(), m_socket.getOutputStream());
+ BufferedReader in = new BufferedReader(
+ new TerminalReader(m_socket.getInputStream(), m_out));
ReentrantLock lock = new ReentrantLock();
// Print welcome banner.
- out.println();
- out.println("Felix Remote Shell Console:");
- out.println("============================");
- out.println("");
+ m_out.println();
+ m_out.println("Felix Remote Shell Console:");
+ m_out.println("============================");
+ m_out.println("");
do
{
- out.print("-> ");
String line = "";
try
{
+ m_out.print("-> ");
line = in.readLine();
//make sure to capture end of stream
if (line == null)
{
- out.println("exit");
+ m_out.println("exit");
return;
}
}
@@ -98,15 +101,15 @@
return;
}
- ShellService shs = Activator.getServices().getFelixShellService(ServiceMediator.NO_WAIT);
+ ShellService shs = m_owner.getServices().getFelixShellService(ServiceMediator.NO_WAIT);
try
{
lock.acquire();
- shs.executeCommand(line, out, out);
+ shs.executeCommand(line, m_out, m_out);
}
catch (Exception ex)
{
- Activator.getServices().error("Shell::run()", ex);
+ m_owner.getServices().error("Shell::run()", ex);
}
finally
{
@@ -117,7 +120,7 @@
}
catch (IOException ex)
{
- Activator.getServices().error("Shell::run()", ex);
+ m_owner.getServices().error("Shell::run()", ex);
}
finally
{
@@ -129,20 +132,12 @@
private void exit(String message)
{
// farewell message
- try
+ if (message != null)
{
- PrintStream out = new TerminalPrintStream(m_socket.getOutputStream());
- if (message != null)
- {
- out.println(message);
- }
- out.println("Good Bye!");
- out.close();
+ m_out.println(message);
}
- catch (IOException ioe)
- {
- // ignore
- }
+ m_out.println("Good Bye!");
+ m_out.close();
try
{
@@ -150,7 +145,7 @@
}
catch (IOException ex)
{
- Activator.getServices().error("Shell::exit()", ex);
+ m_owner.getServices().error("Shell::exit()", ex);
}
m_owner.unregisterConnection(this);
m_useCounter.decrement();
diff --git a/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java b/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java
index ec01858..430e6d9 100644
--- a/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java
+++ b/shell.remote/src/main/java/org/apache/felix/shell/remote/TerminalPrintStream.java
@@ -19,20 +19,25 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.net.SocketException;
/**
* Class implementing a <tt>TerminalPrintStream</tt>.
*/
class TerminalPrintStream extends PrintStream
{
+ private final ServiceMediator m_services;
+ private volatile boolean m_isClosed = false;
+
/**
* Constructs a new instance wrapping the given <tt>OutputStream</tt>.
*
* @param tout the <tt>OutputStream</tt> to be wrapped.
*/
- public TerminalPrintStream(OutputStream tout)
+ public TerminalPrintStream(ServiceMediator services, OutputStream tout)
{
super(tout);
+ m_services = services;
}//constructor
public void print(String str)
@@ -43,9 +48,12 @@
out.write(bytes, 0, bytes.length);
flush();
}
- catch (IOException ex)
+ catch (Exception ex)
{
- Activator.getServices().error("TerminalPrintStream::print()", ex);
+ if (!m_isClosed)
+ {
+ m_services.error("TerminalPrintStream::print()", ex);
+ }
}
}//print
@@ -62,7 +70,13 @@
}
catch (IOException ex)
{
- Activator.getServices().error("TerminalPrintStream::println()", ex);
+ m_services.error("TerminalPrintStream::println()", ex);
}
}//flush
+
+ public void close()
+ {
+ m_isClosed = true;
+ super.close();
+ }
}//class TerminalPrintStream