FELIX-2647 : Implement Coordinator Service - further fixes for the CT tests cases, 53 of 61 passing
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1550893 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java
index f6ea23c..33ba33f 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java
@@ -20,20 +20,13 @@
import java.util.Hashtable;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.felix.jmx.service.coordinator.CoordinatorMBean;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceFactory;
-import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.coordinator.Coordinator;
-import org.osgi.util.tracker.ServiceTracker;
@SuppressWarnings("deprecation")
public class Activator implements BundleActivator
@@ -41,16 +34,16 @@
private CoordinationMgr mgr;
- private ServiceTracker mbeanServerTracker;
+// private ServiceTracker mbeanServerTracker;
private ServiceRegistration coordinatorService;
- private ServiceRegistration coordinatorCommand;
+// private ServiceRegistration coordinatorCommand;
public void start(BundleContext context)
{
mgr = new CoordinationMgr();
-
+/*
try
{
mbeanServerTracker = new MBeanServerTracker(context, mgr);
@@ -60,13 +53,13 @@
{
// TODO log
}
-
- ServiceFactory factory = new CoordinatorFactory(mgr);
- Hashtable<String, String> props = new Hashtable<String, String>();
+*/
+ final ServiceFactory factory = new CoordinatorFactory(mgr);
+ final Hashtable<String, String> props = new Hashtable<String, String>();
props.put(Constants.SERVICE_DESCRIPTION, "Coordinator Service Implementation");
props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
coordinatorService = context.registerService(Coordinator.class.getName(), factory, props);
-
+/*
try
{
coordinatorCommand = CrdCommand.create(context, mgr);
@@ -75,28 +68,29 @@
{
// most probably missing resolved packages, ignore
}
+*/
}
public void stop(BundleContext context)
{
- if (coordinatorCommand != null)
+/* if (coordinatorCommand != null)
{
coordinatorCommand.unregister();
coordinatorCommand = null;
}
-
+*/
if (coordinatorService != null)
{
coordinatorService.unregister();
coordinatorService = null;
}
-
+/*
if (mbeanServerTracker != null)
{
mbeanServerTracker.close();
mbeanServerTracker = null;
}
-
+*/
mgr.cleanUp();
}
@@ -121,7 +115,7 @@
}
}
-
+/*
static final class MBeanServerTracker extends ServiceTracker
{
@@ -168,4 +162,5 @@
super.removedService(reference, service);
}
}
+*/
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
index 3c65ece..3bd2589 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
@@ -66,9 +67,9 @@
private Throwable failReason;
- private ArrayList<Participant> participants;
+ private final ArrayList<Participant> participants;
- private HashMap<Class<?>, Object> variables;
+ private final HashMap<Class<?>, Object> variables;
private TimerTask timeoutTask;
@@ -76,8 +77,6 @@
public CoordinationImpl(final CoordinatorImpl owner, final long id, final String name, final long timeOutInMs)
{
- // TODO: validate name against Bundle Symbolic Name pattern
-
this.owner = owner;
this.id = id;
this.name = name;
@@ -89,17 +88,26 @@
scheduleTimeout(deadLine);
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getId()
+ */
public long getId()
{
return this.id;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getName()
+ */
public String getName()
{
return name;
}
- public boolean fail(Throwable reason)
+ /**
+ * @see org.osgi.service.coordinator.Coordination#fail(java.lang.Throwable)
+ */
+ public boolean fail(final Throwable reason)
{
if ( reason == null)
{
@@ -109,10 +117,16 @@
{
this.failReason = reason;
- // consider failure reason (if not null)
- for (int i=participants.size()-1;i>=0;i--)
+ final List<Participant> releaseList = new ArrayList<Participant>();
+ synchronized ( this.participants )
{
- final Participant part = participants.get(i);
+ releaseList.addAll(this.participants);
+ this.participants.clear();
+ }
+ // consider failure reason (if not null)
+ for (int i=releaseList.size()-1;i>=0;i--)
+ {
+ final Participant part = releaseList.get(i);
try
{
part.failed(this);
@@ -139,23 +153,32 @@
return false;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#end()
+ */
public void end()
{
if (startTermination())
{
// TODO check for WRONG_THREAD
- this.owner.endNestedCoordinations(this);
+ boolean partialFailure = this.owner.endNestedCoordinations(this);
this.owner.unregister(this, true);
- boolean partialFailure = false;
- for (int i=participants.size()-1;i>=0;i--)
+ final List<Participant> releaseList = new ArrayList<Participant>();
+ synchronized ( this.participants )
{
- final Participant part = participants.get(i);
+ releaseList.addAll(this.participants);
+ this.participants.clear();
+ }
+ // consider failure reason (if not null)
+ for (int i=releaseList.size()-1;i>=0;i--)
+ {
+ final Participant part = releaseList.get(i);
try
{
part.ended(this);
}
- catch (Exception e)
+ catch (final Exception e)
{
// TODO: log
partialFailure = true;
@@ -193,6 +216,9 @@
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getParticipants()
+ */
public List<Participant> getParticipants()
{
// synchronize access to the state to prevent it from being changed
@@ -201,13 +227,19 @@
{
if (state == State.ACTIVE)
{
- return new ArrayList<Participant>(participants);
+ synchronized ( this.participants )
+ {
+ return new ArrayList<Participant>(participants);
+ }
}
}
return Collections.<Participant> emptyList();
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getFailure()
+ */
public Throwable getFailure()
{
return failReason;
@@ -215,24 +247,16 @@
/**
- * Adds the participant to the end of the list of participants of this
- * coordination.
- * <p>
- * This method blocks if the given participant is currently participating in
- * another coordination.
- * <p>
- * Participants can only be added to a coordination if it is active.
- *
- * @throws org.apache.felix.service.coordination.CoordinationException if
- * the participant cannot currently participate in this
- * coordination
+ * @see org.osgi.service.coordinator.Coordination#addParticipant(org.osgi.service.coordinator.Participant)
*/
- public void addParticipant(Participant p)
+ public void addParticipant(final Participant p)
{
-
- // ensure participant only pariticipates on a single coordination
+ if ( p == null ) {
+ throw new IllegalArgumentException("Participant must not be null");
+ }
+ // ensure participant only participates on a single coordination
// this blocks until the participant can participate or until
- // a timeout occurrs (or a deadlock is detected)
+ // a timeout occurs (or a deadlock is detected)
owner.lockParticipant(p, this);
// synchronize access to the state to prevent it from being changed
@@ -244,29 +268,53 @@
owner.releaseParticipant(p);
throw new CoordinationException("Cannot add Participant " + p + " to terminated Coordination", this,
- (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED);
+ (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED, getFailure());
}
- if (!participants.contains(p))
+ synchronized ( this.participants )
{
- participants.add(p);
+ boolean found = false;
+ final Iterator<Participant> iter = this.participants.iterator();
+ while ( !found && iter.hasNext() )
+ {
+ if ( iter.next() == p )
+ {
+ found = true;
+ }
+ }
+ if (!found)
+ {
+ participants.add(p);
+ }
+
}
}
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getVariables()
+ */
public Map<Class<?>, Object> getVariables()
{
return variables;
}
- public long extendTimeout(long timeOutInMs)
+ /**
+ * @see org.osgi.service.coordinator.Coordination#extendTimeout(long)
+ */
+ public long extendTimeout(final long timeOutInMs)
{
+ if ( timeOutInMs < 0 )
+ {
+ throw new IllegalArgumentException("Timeout must not be negative");
+ }
+
synchronized (this)
{
if (isTerminated())
{
throw new CoordinationException("Cannot extend timeout on terminated Coordination", this,
- (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED);
+ (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED, getFailure());
}
if (timeOutInMs > 0)
@@ -280,21 +328,24 @@
}
/**
- * Returns whether the coordination has ended.
- * <p>
- * The return value of <code>false</code> may be a transient situation if
- * the coordination is in the process of terminating.
+ * @see org.osgi.service.coordinator.Coordination#isTerminated()
*/
public boolean isTerminated()
{
return state != State.ACTIVE;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getThread()
+ */
public Thread getThread()
{
return associatedThread;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#join(long)
+ */
public void join(long timeoutInMillis) throws InterruptedException
{
synchronized (this)
@@ -306,6 +357,9 @@
}
}
+ /**
+ * @see org.osgi.service.coordinator.Coordination#push()
+ */
public Coordination push()
{
if ( isTerminated() )
@@ -313,7 +367,24 @@
throw new CoordinationException("Coordination already ended", this, CoordinationException.ALREADY_ENDED);
}
- return owner.push(this);
+ owner.push(this);
+ return this;
+ }
+
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getBundle()
+ */
+ public Bundle getBundle()
+ {
+ return this.owner.getBundle();
+ }
+
+ /**
+ * @see org.osgi.service.coordinator.Coordination#getEnclosingCoordination()
+ */
+ public Coordination getEnclosingCoordination()
+ {
+ return this.owner.getEnclosingCoordination(this);
}
//-------
@@ -322,19 +393,14 @@
* Initiates a coordination timeout. Called from the timer task scheduled by
* the {@link #scheduleTimeout(long)} method.
* <p>
- * This method is inteded to only be called from the scheduled timer task.
+ * This method is intended to only be called from the scheduled timer task.
*/
- void timeout()
+ private void timeout()
{
// Fail the Coordination upon timeout
fail(TIMEOUT);
}
- long getDeadLine()
- {
- return this.deadLine;
- }
-
/**
* If this coordination is still active, this method initiates the
* termination of the coordination by setting the state to
@@ -390,16 +456,6 @@
}
}
- public Bundle getBundle()
- {
- return this.owner.getBundle();
- }
-
- public Coordination getEnclosingCoordination()
- {
- return this.owner.getEnclosingCoordination(this);
- }
-
@Override
public int hashCode()
{
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
index 7e1dd34..0a372bb 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
@@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -41,7 +42,7 @@
import org.osgi.service.coordinator.Participant;
/**
- * The <code>CoordinationMgr</code> is the actual backend manager of all
+ * The <code>CoordinationMgr</code> is the actual back-end manager of all
* Coordinations created by the Coordinator implementation. The methods in this
* class fall into three categories:
* <ul>
@@ -55,8 +56,6 @@
public class CoordinationMgr implements CoordinatorMBean
{
- // TODO - sync access to coordinations
-
private ThreadLocal<Stack<Coordination>> threadStacks;
private final AtomicLong ctr;
@@ -87,7 +86,7 @@
threadStacks = new ThreadLocal<Stack<Coordination>>();
ctr = new AtomicLong(-1);
coordinations = new HashMap<Long, CoordinationImpl>();
- participants = new HashMap<Participant, CoordinationImpl>();
+ participants = new IdentityHashMap<Participant, CoordinationImpl>();
coordinationTimer = new Timer("Coordination Timer", true);
}
@@ -98,15 +97,24 @@
coordinationTimer.cancel();
// terminate all active coordinations
- final Exception reason = new Exception();
- for (Coordination c : coordinations.values())
- {
- c.fail(reason);
+ final List<Coordination> coords = new ArrayList<Coordination>();
+ synchronized ( this.coordinations ) {
+ coords.addAll(this.coordinations.values());
+ this.coordinations.clear();
}
- coordinations.clear();
+ for(final Coordination c : coords)
+ {
+ if ( !c.isTerminated() )
+ {
+ c.fail(Coordination.RELEASED);
+ }
+ }
// release all participants
- participants.clear();
+ synchronized ( this.participants )
+ {
+ participants.clear();
+ }
// cannot really clear out the thread local but we can let it go
threadStacks = null;
@@ -137,6 +145,7 @@
// wait for participant to be released
long cutOff = System.currentTimeMillis() + participationTimeOut;
long waitTime = (participationTimeOut > 500) ? participationTimeOut / 500 : participationTimeOut;
+ // TODO - the above wait time looks wrong e.g. if it's 800, the wait time 1ms
CoordinationImpl current = participants.get(p);
while (current != null && current != c)
{
@@ -187,15 +196,21 @@
Coordination create(final CoordinatorImpl owner, final String name, final long timeout)
{
- long id = ctr.incrementAndGet();
- CoordinationImpl c = new CoordinationImpl(owner, id, name, timeout);
- coordinations.put(id, c);
+ final long id = ctr.incrementAndGet();
+ final CoordinationImpl c = new CoordinationImpl(owner, id, name, timeout);
+ synchronized ( this.coordinations )
+ {
+ coordinations.put(id, c);
+ }
return c;
}
void unregister(final CoordinationImpl c, final boolean removeFromThread)
{
- coordinations.remove(c.getId());
+ synchronized ( this.coordinations )
+ {
+ coordinations.remove(c.getId());
+ }
if ( removeFromThread )
{
Stack<Coordination> stack = threadStacks.get();
@@ -206,7 +221,7 @@
}
}
- Coordination push(final CoordinationImpl c)
+ void push(final CoordinationImpl c)
{
Stack<Coordination> stack = threadStacks.get();
if (stack == null)
@@ -223,7 +238,7 @@
}
c.setAssociatedThread(Thread.currentThread());
- return stack.push(c);
+ stack.push(c);
}
Coordination pop()
@@ -262,8 +277,11 @@
Coordination getCoordinationById(final long id)
{
- CoordinationImpl c = coordinations.get(id);
- return (c == null || c.isTerminated()) ? null : c;
+ synchronized ( this.coordinations )
+ {
+ final CoordinationImpl c = coordinations.get(id);
+ return (c == null || c.isTerminated()) ? null : c;
+ }
}
// ---------- CoordinatorMBean interface
@@ -353,8 +371,9 @@
return null;
}
- public void endNestedCoordinations(final CoordinationImpl c)
+ public boolean endNestedCoordinations(final CoordinationImpl c)
{
+ boolean partiallyFailed = false;
Stack<Coordination> stack = threadStacks.get();
if ( stack != null )
{
@@ -365,10 +384,18 @@
for(int i=0;i<count;i++)
{
final Coordination nested = stack.pop();
- nested.end();
+ try
+ {
+ nested.end();
+ }
+ catch ( final CoordinationException ce)
+ {
+ partiallyFailed = true;
+ }
}
}
}
+ return partiallyFailed;
}
/**
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
index b280c36..eb78b76 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
@@ -23,15 +23,20 @@
import org.osgi.framework.Bundle;
import org.osgi.service.coordinator.Coordination;
-import org.osgi.service.coordinator.CoordinationException;
import org.osgi.service.coordinator.Coordinator;
import org.osgi.service.coordinator.Participant;
+/**
+ * The coordinator implementation is a per bundle wrapper for the
+ * coordination manager.
+ */
public class CoordinatorImpl implements Coordinator
{
+ /** The bundle that requested this service. */
private final Bundle owner;
+ /** The coordination mgr. */
private final CoordinationMgr mgr;
CoordinatorImpl(final Bundle owner, final CoordinationMgr mgr)
@@ -117,6 +122,9 @@
}
}
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#create(java.lang.String, long)
+ */
public Coordination create(final String name, final long timeout)
{
// TODO: check permission
@@ -134,13 +142,19 @@
return c;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#getCoordinations()
+ */
public Collection<Coordination> getCoordinations()
{
// TODO: check permission
return mgr.getCoordinations();
}
- public boolean fail(Throwable reason)
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#fail(java.lang.Throwable)
+ */
+ public boolean fail(final Throwable reason)
{
// TODO: check permission
CoordinationImpl current = (CoordinationImpl) peek();
@@ -151,25 +165,39 @@
return false;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#peek()
+ */
public Coordination peek()
{
// TODO: check permission
return mgr.peek();
}
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#begin(java.lang.String, long)
+ */
public Coordination begin(final String name, final long timeoutInMillis)
{
// TODO: check permission
- return push((CoordinationImpl)create(name, timeoutInMillis));
+ final Coordination c = create(name, timeoutInMillis);
+ this.mgr.push((CoordinationImpl)c);
+ return c;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#pop()
+ */
public Coordination pop()
{
// TODO: check permission
return mgr.pop();
}
- public boolean addParticipant(Participant participant) throws CoordinationException
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#addParticipant(org.osgi.service.coordinator.Participant)
+ */
+ public boolean addParticipant(Participant participant)
{
// TODO: check permission
Coordination current = peek();
@@ -181,6 +209,9 @@
return false;
}
+ /**
+ * @see org.osgi.service.coordinator.Coordinator#getCoordination(long)
+ */
public Coordination getCoordination(long id)
{
// TODO: check permission
@@ -189,10 +220,9 @@
//----------
- Coordination push(final CoordinationImpl c)
+ void push(final CoordinationImpl c)
{
- // TODO: check permission
- return mgr.push(c);
+ mgr.push(c);
}
void unregister(final CoordinationImpl c, final boolean removeFromStack)
@@ -220,13 +250,13 @@
return this.owner;
}
- public Coordination getEnclosingCoordination(final CoordinationImpl c)
+ Coordination getEnclosingCoordination(final CoordinationImpl c)
{
return mgr.getEnclosingCoordination(c);
}
- public void endNestedCoordinations(final CoordinationImpl c)
+ boolean endNestedCoordinations(final CoordinationImpl c)
{
- this.mgr.endNestedCoordinations(c);
+ return this.mgr.endNestedCoordinations(c);
}
}