FELIX-2647 Further improvements:
* JavaDoc
* Thread Safety
* Improved timeout support
* Improved support to ensure Participants only participate in a single coordination
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1022014 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
index c84721a..5748a1d 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
@@ -20,24 +20,40 @@
import java.util.Hashtable;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.felix.jmx.service.coordination.CoordinatorMBean;
import org.apache.felix.service.coordination.Coordinator;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
+import org.osgi.util.tracker.ServiceTracker;
@SuppressWarnings("deprecation")
public class Activator implements BundleActivator {
private CoordinationMgr mgr;
+ private ServiceTracker mbeanServerTracker;
+
private ServiceRegistration coordinatorService;
- public void start(BundleContext context) throws Exception {
+ public void start(BundleContext context) {
mgr = new CoordinationMgr();
+ try {
+ mbeanServerTracker = new MBeanServerTracker(context, mgr);
+ mbeanServerTracker.open();
+ } catch (MalformedObjectNameException e) {
+ // TODO log
+ }
+
ServiceFactory factory = new CoordinatorFactory(mgr);
Hashtable<String, String> props = new Hashtable<String, String>();
props.put(Constants.SERVICE_DESCRIPTION,
@@ -47,12 +63,17 @@
Coordinator.class.getName(), factory, props);
}
- public void stop(BundleContext context) throws Exception {
+ public void stop(BundleContext context) {
if (coordinatorService != null) {
coordinatorService.unregister();
coordinatorService = null;
}
+ if (mbeanServerTracker != null) {
+ mbeanServerTracker.close();
+ mbeanServerTracker = null;
+ }
+
mgr.cleanUp();
}
@@ -74,4 +95,42 @@
}
}
+
+ static final class MBeanServerTracker extends ServiceTracker {
+
+ private final CoordinationMgr mgr;
+
+ private final ObjectName objectName;
+
+ MBeanServerTracker(final BundleContext context,
+ final CoordinationMgr mgr) throws MalformedObjectNameException {
+ super(context, MBeanServer.class.getName(), null);
+ this.mgr = mgr;
+ this.objectName = new ObjectName(CoordinatorMBean.OBJECTNAME);
+ }
+
+ @Override
+ public Object addingService(ServiceReference reference) {
+ MBeanServer server = (MBeanServer) super.addingService(reference);
+
+ try {
+ server.registerMBean(mgr, objectName);
+ } catch (Exception e) {
+ // TODO: log
+ }
+
+ return server;
+ }
+
+ @Override
+ public void removedService(ServiceReference reference, Object service) {
+ try {
+ ((MBeanServer) service).unregisterMBean(objectName);
+ } catch (Exception e) {
+ // TODO: log
+ }
+
+ super.removedService(reference, service);
+ }
+ }
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
index f8ac3b0..cb400e1 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
@@ -20,8 +20,10 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.TimerTask;
import org.apache.felix.service.coordination.Coordination;
import org.apache.felix.service.coordination.Participant;
@@ -43,6 +45,8 @@
FAILED;
}
+ private final CoordinationMgr mgr;
+
private final long id;
private final String name;
@@ -50,22 +54,40 @@
// TODO: timeout must be enforced
private long timeOutInMs;
- private State state;
+ /**
+ * Access to this field must be synchronized as long as the expected state
+ * is {@link State#ACTIVE}. Once the state has changed, further updates to
+ * this instance will not take place any more and the state will only be
+ * modified by the thread successfully setting the state to
+ * {@link State#TERMINATING}.
+ */
+ private volatile State state;
- private boolean mustFail;
+ private int mustFail;
- private boolean timeout;
+ private Throwable failReason;
private ArrayList<Participant> participants;
private HashMap<Class<?>, Object> variables;
- public CoordinationImpl(final long id, final String name) {
+ private TimerTask timeoutTask;
+
+ private Thread initiatorThread;
+
+ public CoordinationImpl(final CoordinationMgr mgr, final long id,
+ final String name, final long defaultTimeOutInMs) {
+ this.mgr = mgr;
this.id = id;
this.name = name;
+ this.mustFail = 0;
this.state = State.ACTIVE;
this.participants = new ArrayList<Participant>();
this.variables = new HashMap<Class<?>, Object>();
+ this.timeOutInMs = -defaultTimeOutInMs;
+ this.initiatorThread = Thread.currentThread();
+
+ scheduleTimeout(defaultTimeOutInMs);
}
public String getName() {
@@ -76,12 +98,35 @@
return this.id;
}
- void mustFail() {
- this.mustFail = true;
+ void mustFail(final Throwable reason) {
+ this.mustFail = FAILED;
+ this.failReason = reason;
}
+ /**
+ * Initiates a coordination timeout. Called from the timer task scheduled by
+ * the {@link #scheduleTimeout(long)} method.
+ * <p>
+ * This method is inteded to only be called from the scheduled timer task.
+ */
void timeout() {
- this.timeout = true;
+ // If a timeout happens, the coordination thread is set to always fail
+ this.mustFail = TIMEOUT;
+
+ // and interrupted and a small delay happens to allow the initiator to
+ // clean up by reacting on the interrupt. If the initiator can do this
+ // clean up normally, the end() method will return TIMEOUT.
+ try {
+ initiatorThread.interrupt();
+ Thread.sleep(500); // half a second for now
+ } catch (SecurityException se) {
+ // thrown by interrupt -- no need to wait if interrupt fails
+ } catch (InterruptedException ie) {
+ // someone interrupted us while delaying, just continue
+ }
+
+ // After this delay the coordination is forcefully failed.
+ CoordinationImpl.this.fail(null);
}
long getTimeOut() {
@@ -89,24 +134,12 @@
}
public int end() throws IllegalStateException {
- if (state == State.ACTIVE) {
- int reason = OK;
- if (mustFail || timeout) {
- fail(new Exception());
- reason = mustFail ? FAILED : TIMEOUT;
- } else {
- state = State.TERMINATING;
- for (Participant part : participants) {
- try {
- part.ended(this);
- } catch (Exception e) {
- // TODO: log
- reason = PARTIALLY_ENDED;
- }
- }
- state = State.TERMINATED;
+ if (startTermination()) {
+ if (mustFail != 0) {
+ failInternal();
+ return mustFail;
}
- return reason;
+ return endInternal();
}
// already terminated
@@ -114,16 +147,9 @@
}
public boolean fail(Throwable reason) {
- if (state == State.ACTIVE) {
- state = State.TERMINATING;
- for (Participant part : participants) {
- try {
- part.failed(this);
- } catch (Exception e) {
- // TODO: log
- }
- }
- state = State.FAILED;
+ if (startTermination()) {
+ this.failReason = reason;
+ failInternal();
return true;
}
return false;
@@ -131,36 +157,182 @@
public boolean terminate() {
if (state == State.ACTIVE) {
- end();
- return true;
+ try {
+ end();
+ return true;
+ } catch (IllegalStateException ise) {
+ // another thread might have started the termination just
+ // after the current thread checked the state but before the
+ // end() method called on this thread was able to change the
+ // state. Just ignore this exception and continue.
+ }
}
return false;
}
+ /**
+ * Returns whether the coordination has ended in failure.
+ * <p>
+ * The return value of <code>false</code> may be a transient situation if
+ * the coordination is in the process of terminating due to a failure.
+ */
public boolean isFailed() {
return state == State.FAILED;
}
public void addTimeout(long timeOutInMs) {
+ if (this.timeOutInMs > 0) {
+ // already set, ignore
+ }
+
this.timeOutInMs = timeOutInMs;
+ scheduleTimeout(timeOutInMs);
}
+ /**
+ * Adds the participant to the end of the list of participants of this
+ * coordination.
+ * <p>
+ * This method blocks if the given participant is currently participating in
+ * another coordination.
+ * <p>
+ * Participants can only be added to a coordination if it is active.
+ *
+ * @throws org.apache.felix.service.coordination.CoordinationException if
+ * the participant cannot currently participate in this
+ * coordination
+ */
public boolean participate(Participant p) {
- if (state == State.ACTIVE) {
- if (!participants.contains(p)) {
- participants.add(p);
+
+ // ensure participant only pariticipates on a single coordination
+ // this blocks until the participant can participate or until
+ // a timeout occurrs (or a deadlock is detected)
+ mgr.lockParticipant(p, this);
+
+ // synchronize access to the state to prevent it from being changed
+ // while adding the participant
+ synchronized (this) {
+ if (state == State.ACTIVE) {
+ if (!participants.contains(p)) {
+ participants.add(p);
+ }
+ return true;
}
- return true;
+ return false;
}
- return false;
}
public Collection<Participant> getParticipants() {
- return new ArrayList<Participant>(participants);
+ // synchronize access to the state to prevent it from being changed
+ // while we create a copy of the participant list
+ synchronized (this) {
+ if (state == State.ACTIVE) {
+ return new ArrayList<Participant>(participants);
+ }
+ }
+
+ return Collections.<Participant> emptyList();
}
public Map<Class<?>, ?> getVariables() {
return variables;
}
+ /**
+ * If this coordination is still active, this method initiates the
+ * termination of the coordination by setting the state to
+ * {@value State#TERMINATING}, unregistering from the
+ * {@link CoordinationMgr} and ensuring there is no timeout task active any
+ * longer to timeout this coordination.
+ *
+ * @return <code>true</code> If the coordination was active and termination
+ * can continue. If <code>false</code> is returned, the coordination
+ * must be considered terminated (or terminating) in the current
+ * thread and no further termination processing must take place.
+ */
+ private synchronized boolean startTermination() {
+ if (state == State.ACTIVE) {
+ state = State.TERMINATING;
+ mgr.unregister(this);
+ scheduleTimeout(-1);
+ return true;
+ }
+
+ // this coordination is not active any longer, nothing to do
+ return false;
+ }
+
+ /**
+ * Internal implemenation of successful termination of the coordination.
+ * <p>
+ * This method must only be called after the {@link #state} field has been
+ * set to {@link State#TERMINATING} and only be the method successfully
+ * setting this state.
+ *
+ * @return OK or PARTIALLY_ENDED depending on whether all participants
+ * succeeded or some of them failed ending the coordination.
+ */
+ private int endInternal() {
+ int reason = OK;
+ for (Participant part : participants) {
+ try {
+ part.ended(this);
+ } catch (Exception e) {
+ // TODO: log
+ reason = PARTIALLY_ENDED;
+ }
+
+ // release the participant for other coordinations
+ mgr.releaseParticipant(part);
+ }
+ state = State.TERMINATED;
+ return reason;
+ }
+
+ /**
+ * Internal implemenation of coordination failure.
+ * <p>
+ * This method must only be called after the {@link #state} field has been
+ * set to {@link State#TERMINATING} and only be the method successfully
+ * setting this state.
+ */
+ private void failInternal() {
+ // consider failure reason (if not null)
+ for (Participant part : participants) {
+ try {
+ part.failed(this);
+ } catch (Exception e) {
+ // TODO: log
+ }
+
+ // release the participant for other coordinations
+ mgr.releaseParticipant(part);
+ }
+ state = State.FAILED;
+ }
+
+ /**
+ * Helper method for timeout scheduling. If a timer is currently scheduled
+ * it is canceled. If the new timeout value is a positive value a new timer
+ * is scheduled to fire of so many milliseconds from now.
+ *
+ * @param timeout The new timeout value
+ */
+ private void scheduleTimeout(final long timeout) {
+ if (timeoutTask != null) {
+ mgr.schedule(timeoutTask, -1);
+ timeoutTask = null;
+ }
+
+ if (timeout > 0) {
+ timeoutTask = new TimerTask() {
+ @Override
+ public void run() {
+ CoordinationImpl.this.timeout();
+ }
+ };
+
+ mgr.schedule(timeoutTask, timeout);
+ }
+ }
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
index 77f3e90..b67c99e 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
@@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@@ -38,6 +40,18 @@
import org.apache.felix.service.coordination.CoordinationException;
import org.apache.felix.service.coordination.Participant;
+/**
+ * The <code>CoordinationMgr</code> is the actual backend manager of all
+ * Coordinations created by the Coordinator implementation. The methods in this
+ * class fall into three categories:
+ * <ul>
+ * <li>Actual implementations of the Coordinator interface on behalf of the
+ * per-bundle Coordinator service instances</li>
+ * <li>Implementation of the CoordinatorMBean interface allowing JMX management
+ * of the coordinations</li>
+ * <li>Management support to timeout and cleanup coordinations</li>
+ * </ul>
+ */
@SuppressWarnings("deprecation")
public class CoordinationMgr implements CoordinatorMBean {
@@ -47,9 +61,30 @@
private final Map<Long, CoordinationImpl> coordinations;
+ private final Map<Participant, CoordinationImpl> participants;
+
+ private final Timer coordinationTimer;
+
+ /**
+ * Default coordination timeout. Currently hard coded to be 30s (the
+ * specified minimum timeout). Should be made configurable, but not less
+ * than 30s.
+ */
+ private long defaultTimeOut = 30 * 1000L;
+
+ /**
+ * Wait at most 60 seconds for participant to be eligible for participation
+ * in a coordination.
+ *
+ * @see #singularizeParticipant(Participant, CoordinationImpl)
+ */
+ private long participationTimeOut = 60 * 1000L;
+
CoordinationMgr() {
ctr = new AtomicLong(-1);
coordinations = new HashMap<Long, CoordinationImpl>();
+ participants = new HashMap<Participant, CoordinationImpl>();
+ coordinationTimer = new Timer("Coordination Timer", true);
}
void unregister(final CoordinationImpl c) {
@@ -61,25 +96,81 @@
}
void cleanUp() {
+ // terminate coordination timeout timer
+ coordinationTimer.purge();
+ coordinationTimer.cancel();
+
+ // terminate all active coordinations
final Exception reason = new Exception();
for (Coordination c : coordinations.values()) {
c.fail(reason);
}
coordinations.clear();
+
+ // release all participants
+ participants.clear();
}
- public Coordination create(String name) {
+ void configure(final long coordinationTimeout,
+ final long participationTimeout) {
+ this.defaultTimeOut = coordinationTimeout;
+ this.participationTimeOut = participationTimeout;
+ }
+
+ void schedule(final TimerTask task, final long delay) {
+ if (delay < 0) {
+ task.cancel();
+ } else {
+ coordinationTimer.schedule(task, delay);
+ }
+ }
+
+ void lockParticipant(final Participant p, final CoordinationImpl c) {
+ synchronized (participants) {
+ // wait for participant to be released
+ long cutOff = System.currentTimeMillis() + participationTimeOut;
+ while (participants.containsKey(p)) {
+ try {
+ participants.wait(participationTimeOut / 500);
+ } catch (InterruptedException ie) {
+ // don't worry, just keep on waiting
+ }
+
+ // timeout waiting for participation
+ if (System.currentTimeMillis() > cutOff) {
+ throw new CoordinationException(
+ "Timed out waiting to join coordinaton", c.getName(),
+ CoordinationException.TIMEOUT);
+ }
+ }
+
+ // lock participant into coordination
+ participants.put(p, c);
+ }
+ }
+
+ void releaseParticipant(final Participant p) {
+ synchronized (participants) {
+ participants.remove(p);
+ participants.notifyAll();
+ }
+ }
+
+ // ---------- Coordinator back end implementation
+
+ Coordination create(String name) {
long id = ctr.incrementAndGet();
- CoordinationImpl c = new CoordinationImpl(id, name);
+ CoordinationImpl c = new CoordinationImpl(this, id, name,
+ defaultTimeOut);
coordinations.put(id, c);
return c;
}
- public Coordination begin(String name) {
+ Coordination begin(String name) {
return push(create(name));
}
- public Coordination push(Coordination c) {
+ Coordination push(Coordination c) {
Stack<Coordination> stack = threadStacks.get();
if (stack == null) {
stack = new Stack<Coordination>();
@@ -88,7 +179,7 @@
return stack.push(c);
}
- public Coordination pop() {
+ Coordination pop() {
Stack<Coordination> stack = threadStacks.get();
if (stack != null && !stack.isEmpty()) {
return stack.pop();
@@ -96,7 +187,7 @@
return null;
}
- public Coordination getCurrentCoordination() {
+ Coordination getCurrentCoordination() {
Stack<Coordination> stack = threadStacks.get();
if (stack != null && !stack.isEmpty()) {
return stack.peek();
@@ -104,16 +195,16 @@
return null;
}
- public boolean alwaysFail(Throwable reason) {
+ boolean alwaysFail(Throwable reason) {
CoordinationImpl current = (CoordinationImpl) getCurrentCoordination();
if (current != null) {
- current.mustFail();
+ current.mustFail(reason);
return true;
}
return false;
}
- public Collection<Coordination> getCoordinations() {
+ Collection<Coordination> getCoordinations() {
ArrayList<Coordination> result = new ArrayList<Coordination>();
Stack<Coordination> stack = threadStacks.get();
if (stack != null) {
@@ -122,9 +213,7 @@
return result;
}
- public boolean participate(Participant participant)
- throws CoordinationException {
- // TODO: check for multi-pariticipation and block
+ boolean participate(Participant participant) throws CoordinationException {
Coordination current = getCurrentCoordination();
if (current != null) {
current.participate(participant);
@@ -133,8 +222,7 @@
return false;
}
- public Coordination participateOrBegin(Participant ifActive) {
- // TODO: check for multi-pariticipation and block
+ Coordination participateOrBegin(Participant ifActive) {
Coordination current = getCurrentCoordination();
if (current == null) {
current = begin("implicit");
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java
index dd5f5c0..10321ec 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java
@@ -42,11 +42,6 @@
return mgr.create(name);
}
- void unregister(final CoordinationImpl c) {
- // TODO: check permission
- mgr.unregister(c);
- }
-
public Coordination begin(String name) {
// TODO: check permission
return mgr.begin(name);