FELIX-2647 : Implement Coordinator Service - further fixes for the CT tests cases, 53 of 61 passing

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1550893 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java
index f6ea23c..33ba33f 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/Activator.java
@@ -20,20 +20,13 @@
 
 import java.util.Hashtable;
 
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.felix.jmx.service.coordinator.CoordinatorMBean;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceFactory;
-import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.coordinator.Coordinator;
-import org.osgi.util.tracker.ServiceTracker;
 
 @SuppressWarnings("deprecation")
 public class Activator implements BundleActivator
@@ -41,16 +34,16 @@
 
     private CoordinationMgr mgr;
 
-    private ServiceTracker mbeanServerTracker;
+//    private ServiceTracker mbeanServerTracker;
 
     private ServiceRegistration coordinatorService;
 
-    private ServiceRegistration coordinatorCommand;
+//    private ServiceRegistration coordinatorCommand;
 
     public void start(BundleContext context)
     {
         mgr = new CoordinationMgr();
-
+/*
         try
         {
             mbeanServerTracker = new MBeanServerTracker(context, mgr);
@@ -60,13 +53,13 @@
         {
             // TODO log
         }
-
-        ServiceFactory factory = new CoordinatorFactory(mgr);
-        Hashtable<String, String> props = new Hashtable<String, String>();
+*/
+        final ServiceFactory factory = new CoordinatorFactory(mgr);
+        final Hashtable<String, String> props = new Hashtable<String, String>();
         props.put(Constants.SERVICE_DESCRIPTION, "Coordinator Service Implementation");
         props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
         coordinatorService = context.registerService(Coordinator.class.getName(), factory, props);
-
+/*
         try
         {
             coordinatorCommand = CrdCommand.create(context, mgr);
@@ -75,28 +68,29 @@
         {
             // most probably missing resolved packages, ignore
         }
+*/
     }
 
     public void stop(BundleContext context)
     {
-        if (coordinatorCommand != null)
+/*        if (coordinatorCommand != null)
         {
             coordinatorCommand.unregister();
             coordinatorCommand = null;
         }
-
+*/
         if (coordinatorService != null)
         {
             coordinatorService.unregister();
             coordinatorService = null;
         }
-
+/*
         if (mbeanServerTracker != null)
         {
             mbeanServerTracker.close();
             mbeanServerTracker = null;
         }
-
+*/
         mgr.cleanUp();
     }
 
@@ -121,7 +115,7 @@
         }
 
     }
-
+/*
     static final class MBeanServerTracker extends ServiceTracker
     {
 
@@ -168,4 +162,5 @@
             super.removedService(reference, service);
         }
     }
+*/
 }
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
index 3c65ece..3bd2589 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TimerTask;
@@ -66,9 +67,9 @@
 
     private Throwable failReason;
 
-    private ArrayList<Participant> participants;
+    private final ArrayList<Participant> participants;
 
-    private HashMap<Class<?>, Object> variables;
+    private final HashMap<Class<?>, Object> variables;
 
     private TimerTask timeoutTask;
 
@@ -76,8 +77,6 @@
 
     public CoordinationImpl(final CoordinatorImpl owner, final long id, final String name, final long timeOutInMs)
     {
-        // TODO: validate name against Bundle Symbolic Name pattern
-
         this.owner = owner;
         this.id = id;
         this.name = name;
@@ -89,17 +88,26 @@
         scheduleTimeout(deadLine);
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getId()
+     */
     public long getId()
     {
         return this.id;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getName()
+     */
     public String getName()
     {
         return name;
     }
 
-    public boolean fail(Throwable reason)
+    /**
+     * @see org.osgi.service.coordinator.Coordination#fail(java.lang.Throwable)
+     */
+    public boolean fail(final Throwable reason)
     {
         if ( reason == null)
         {
@@ -109,10 +117,16 @@
         {
             this.failReason = reason;
 
-            // consider failure reason (if not null)
-            for (int i=participants.size()-1;i>=0;i--)
+            final List<Participant> releaseList = new ArrayList<Participant>();
+            synchronized ( this.participants )
             {
-                final Participant part = participants.get(i);
+                releaseList.addAll(this.participants);
+                this.participants.clear();
+            }
+            // consider failure reason (if not null)
+            for (int i=releaseList.size()-1;i>=0;i--)
+            {
+                final Participant part = releaseList.get(i);
                 try
                 {
                     part.failed(this);
@@ -139,23 +153,32 @@
         return false;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#end()
+     */
     public void end()
     {
         if (startTermination())
         {
             // TODO check for WRONG_THREAD
-        	this.owner.endNestedCoordinations(this);
+            boolean partialFailure = this.owner.endNestedCoordinations(this);
             this.owner.unregister(this, true);
 
-        	boolean partialFailure = false;
-            for (int i=participants.size()-1;i>=0;i--)
+            final List<Participant> releaseList = new ArrayList<Participant>();
+            synchronized ( this.participants )
             {
-                final Participant part = participants.get(i);
+                releaseList.addAll(this.participants);
+                this.participants.clear();
+            }
+            // consider failure reason (if not null)
+            for (int i=releaseList.size()-1;i>=0;i--)
+            {
+                final Participant part = releaseList.get(i);
                 try
                 {
                     part.ended(this);
                 }
-                catch (Exception e)
+                catch (final Exception e)
                 {
                     // TODO: log
                     partialFailure = true;
@@ -193,6 +216,9 @@
     }
 
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getParticipants()
+     */
     public List<Participant> getParticipants()
     {
         // synchronize access to the state to prevent it from being changed
@@ -201,13 +227,19 @@
         {
             if (state == State.ACTIVE)
             {
-                return new ArrayList<Participant>(participants);
+                synchronized ( this.participants )
+                {
+                    return new ArrayList<Participant>(participants);
+                }
             }
         }
 
         return Collections.<Participant> emptyList();
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getFailure()
+     */
     public Throwable getFailure()
     {
         return failReason;
@@ -215,24 +247,16 @@
 
 
     /**
-     * Adds the participant to the end of the list of participants of this
-     * coordination.
-     * <p>
-     * This method blocks if the given participant is currently participating in
-     * another coordination.
-     * <p>
-     * Participants can only be added to a coordination if it is active.
-     *
-     * @throws org.apache.felix.service.coordination.CoordinationException if
-     *             the participant cannot currently participate in this
-     *             coordination
+     * @see org.osgi.service.coordinator.Coordination#addParticipant(org.osgi.service.coordinator.Participant)
      */
-    public void addParticipant(Participant p)
+    public void addParticipant(final Participant p)
     {
-
-        // ensure participant only pariticipates on a single coordination
+        if ( p == null ) {
+            throw new IllegalArgumentException("Participant must not be null");
+        }
+        // ensure participant only participates on a single coordination
         // this blocks until the participant can participate or until
-        // a timeout occurrs (or a deadlock is detected)
+        // a timeout occurs (or a deadlock is detected)
         owner.lockParticipant(p, this);
 
         // synchronize access to the state to prevent it from being changed
@@ -244,29 +268,53 @@
                 owner.releaseParticipant(p);
 
                 throw new CoordinationException("Cannot add Participant " + p + " to terminated Coordination", this,
-                    (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED);
+                    (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED, getFailure());
             }
 
-            if (!participants.contains(p))
+            synchronized ( this.participants )
             {
-                participants.add(p);
+                boolean found = false;
+                final Iterator<Participant> iter = this.participants.iterator();
+                while ( !found && iter.hasNext() )
+                {
+                    if ( iter.next() == p )
+                    {
+                        found = true;
+                    }
+                }
+                if (!found)
+                {
+                    participants.add(p);
+                }
+
             }
         }
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getVariables()
+     */
     public Map<Class<?>, Object> getVariables()
     {
         return variables;
     }
 
-    public long extendTimeout(long timeOutInMs)
+    /**
+     * @see org.osgi.service.coordinator.Coordination#extendTimeout(long)
+     */
+    public long extendTimeout(final long timeOutInMs)
     {
+        if ( timeOutInMs < 0 )
+        {
+            throw new IllegalArgumentException("Timeout must not be negative");
+        }
+
         synchronized (this)
         {
             if (isTerminated())
             {
                 throw new CoordinationException("Cannot extend timeout on terminated Coordination", this,
-                    (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED);
+                    (getFailure() != null) ? CoordinationException.FAILED : CoordinationException.ALREADY_ENDED, getFailure());
             }
 
             if (timeOutInMs > 0)
@@ -280,21 +328,24 @@
     }
 
     /**
-     * Returns whether the coordination has ended.
-     * <p>
-     * The return value of <code>false</code> may be a transient situation if
-     * the coordination is in the process of terminating.
+     * @see org.osgi.service.coordinator.Coordination#isTerminated()
      */
     public boolean isTerminated()
     {
         return state != State.ACTIVE;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getThread()
+     */
     public Thread getThread()
     {
         return associatedThread;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#join(long)
+     */
     public void join(long timeoutInMillis) throws InterruptedException
     {
         synchronized (this)
@@ -306,6 +357,9 @@
         }
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordination#push()
+     */
     public Coordination push()
     {
     	if ( isTerminated() )
@@ -313,7 +367,24 @@
             throw new CoordinationException("Coordination already ended", this, CoordinationException.ALREADY_ENDED);
     	}
 
-        return owner.push(this);
+        owner.push(this);
+        return this;
+    }
+
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getBundle()
+     */
+    public Bundle getBundle()
+    {
+        return this.owner.getBundle();
+    }
+
+    /**
+     * @see org.osgi.service.coordinator.Coordination#getEnclosingCoordination()
+     */
+    public Coordination getEnclosingCoordination()
+    {
+        return this.owner.getEnclosingCoordination(this);
     }
 
     //-------
@@ -322,19 +393,14 @@
      * Initiates a coordination timeout. Called from the timer task scheduled by
      * the {@link #scheduleTimeout(long)} method.
      * <p>
-     * This method is inteded to only be called from the scheduled timer task.
+     * This method is intended to only be called from the scheduled timer task.
      */
-    void timeout()
+    private void timeout()
     {
         // Fail the Coordination upon timeout
         fail(TIMEOUT);
     }
 
-    long getDeadLine()
-    {
-        return this.deadLine;
-    }
-
     /**
      * If this coordination is still active, this method initiates the
      * termination of the coordination by setting the state to
@@ -390,16 +456,6 @@
         }
     }
 
-    public Bundle getBundle()
-    {
-        return this.owner.getBundle();
-    }
-
-    public Coordination getEnclosingCoordination()
-    {
-        return this.owner.getEnclosingCoordination(this);
-    }
-
 	@Override
 	public int hashCode()
 	{
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
index 7e1dd34..0a372bb 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +42,7 @@
 import org.osgi.service.coordinator.Participant;
 
 /**
- * The <code>CoordinationMgr</code> is the actual backend manager of all
+ * The <code>CoordinationMgr</code> is the actual back-end manager of all
  * Coordinations created by the Coordinator implementation. The methods in this
  * class fall into three categories:
  * <ul>
@@ -55,8 +56,6 @@
 public class CoordinationMgr implements CoordinatorMBean
 {
 
-    // TODO - sync access to coordinations
-
     private ThreadLocal<Stack<Coordination>> threadStacks;
 
     private final AtomicLong ctr;
@@ -87,7 +86,7 @@
         threadStacks = new ThreadLocal<Stack<Coordination>>();
         ctr = new AtomicLong(-1);
         coordinations = new HashMap<Long, CoordinationImpl>();
-        participants = new HashMap<Participant, CoordinationImpl>();
+        participants = new IdentityHashMap<Participant, CoordinationImpl>();
         coordinationTimer = new Timer("Coordination Timer", true);
     }
 
@@ -98,15 +97,24 @@
         coordinationTimer.cancel();
 
         // terminate all active coordinations
-        final Exception reason = new Exception();
-        for (Coordination c : coordinations.values())
-        {
-            c.fail(reason);
+        final List<Coordination> coords = new ArrayList<Coordination>();
+        synchronized ( this.coordinations ) {
+            coords.addAll(this.coordinations.values());
+            this.coordinations.clear();
         }
-        coordinations.clear();
+        for(final Coordination c : coords)
+        {
+            if ( !c.isTerminated() )
+            {
+                c.fail(Coordination.RELEASED);
+            }
+        }
 
         // release all participants
-        participants.clear();
+        synchronized ( this.participants )
+        {
+            participants.clear();
+        }
 
         // cannot really clear out the thread local but we can let it go
         threadStacks = null;
@@ -137,6 +145,7 @@
             // wait for participant to be released
             long cutOff = System.currentTimeMillis() + participationTimeOut;
             long waitTime = (participationTimeOut > 500) ? participationTimeOut / 500 : participationTimeOut;
+            // TODO - the above wait time looks wrong e.g. if it's 800, the wait time 1ms
             CoordinationImpl current = participants.get(p);
             while (current != null && current != c)
             {
@@ -187,15 +196,21 @@
 
     Coordination create(final CoordinatorImpl owner, final String name, final long timeout)
     {
-        long id = ctr.incrementAndGet();
-        CoordinationImpl c = new CoordinationImpl(owner, id, name, timeout);
-        coordinations.put(id, c);
+        final long id = ctr.incrementAndGet();
+        final CoordinationImpl c = new CoordinationImpl(owner, id, name, timeout);
+        synchronized ( this.coordinations )
+        {
+            coordinations.put(id, c);
+        }
         return c;
     }
 
     void unregister(final CoordinationImpl c, final boolean removeFromThread)
     {
-        coordinations.remove(c.getId());
+        synchronized ( this.coordinations )
+        {
+            coordinations.remove(c.getId());
+        }
         if ( removeFromThread )
         {
             Stack<Coordination> stack = threadStacks.get();
@@ -206,7 +221,7 @@
         }
     }
 
-    Coordination push(final CoordinationImpl c)
+    void push(final CoordinationImpl c)
     {
         Stack<Coordination> stack = threadStacks.get();
         if (stack == null)
@@ -223,7 +238,7 @@
         }
 
         c.setAssociatedThread(Thread.currentThread());
-        return stack.push(c);
+        stack.push(c);
     }
 
     Coordination pop()
@@ -262,8 +277,11 @@
 
     Coordination getCoordinationById(final long id)
     {
-        CoordinationImpl c = coordinations.get(id);
-        return (c == null || c.isTerminated()) ? null : c;
+        synchronized ( this.coordinations )
+        {
+            final CoordinationImpl c = coordinations.get(id);
+            return (c == null || c.isTerminated()) ? null : c;
+        }
     }
 
     // ---------- CoordinatorMBean interface
@@ -353,8 +371,9 @@
 		return null;
 	}
 
-	public void endNestedCoordinations(final CoordinationImpl c)
+	public boolean endNestedCoordinations(final CoordinationImpl c)
 	{
+	    boolean partiallyFailed = false;
         Stack<Coordination> stack = threadStacks.get();
         if ( stack != null )
         {
@@ -365,10 +384,18 @@
         		for(int i=0;i<count;i++)
         		{
         			final Coordination nested = stack.pop();
-        			nested.end();
+        			try
+        			{
+        			    nested.end();
+        			}
+        			catch ( final CoordinationException ce)
+        			{
+        			    partiallyFailed = true;
+        			}
         		}
         	}
         }
+        return partiallyFailed;
 	}
 
 	/**
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
index b280c36..eb78b76 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
@@ -23,15 +23,20 @@
 
 import org.osgi.framework.Bundle;
 import org.osgi.service.coordinator.Coordination;
-import org.osgi.service.coordinator.CoordinationException;
 import org.osgi.service.coordinator.Coordinator;
 import org.osgi.service.coordinator.Participant;
 
+/**
+ * The coordinator implementation is a per bundle wrapper for the
+ * coordination manager.
+ */
 public class CoordinatorImpl implements Coordinator
 {
 
+    /** The bundle that requested this service. */
     private final Bundle owner;
 
+    /** The coordination mgr. */
     private final CoordinationMgr mgr;
 
     CoordinatorImpl(final Bundle owner, final CoordinationMgr mgr)
@@ -117,6 +122,9 @@
         }
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#create(java.lang.String, long)
+     */
     public Coordination create(final String name, final long timeout)
     {
         // TODO: check permission
@@ -134,13 +142,19 @@
         return c;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#getCoordinations()
+     */
     public Collection<Coordination> getCoordinations()
     {
         // TODO: check permission
         return mgr.getCoordinations();
     }
 
-    public boolean fail(Throwable reason)
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#fail(java.lang.Throwable)
+     */
+    public boolean fail(final Throwable reason)
     {
         // TODO: check permission
         CoordinationImpl current = (CoordinationImpl) peek();
@@ -151,25 +165,39 @@
         return false;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#peek()
+     */
     public Coordination peek()
     {
         // TODO: check permission
         return mgr.peek();
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#begin(java.lang.String, long)
+     */
     public Coordination begin(final String name, final long timeoutInMillis)
     {
         // TODO: check permission
-        return push((CoordinationImpl)create(name, timeoutInMillis));
+        final Coordination c = create(name, timeoutInMillis);
+        this.mgr.push((CoordinationImpl)c);
+        return c;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#pop()
+     */
     public Coordination pop()
     {
         // TODO: check permission
         return mgr.pop();
     }
 
-    public boolean addParticipant(Participant participant) throws CoordinationException
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#addParticipant(org.osgi.service.coordinator.Participant)
+     */
+    public boolean addParticipant(Participant participant)
     {
         // TODO: check permission
         Coordination current = peek();
@@ -181,6 +209,9 @@
         return false;
     }
 
+    /**
+     * @see org.osgi.service.coordinator.Coordinator#getCoordination(long)
+     */
     public Coordination getCoordination(long id)
     {
         // TODO: check permission
@@ -189,10 +220,9 @@
 
     //----------
 
-    Coordination push(final CoordinationImpl c)
+    void push(final CoordinationImpl c)
     {
-        // TODO: check permission
-        return mgr.push(c);
+        mgr.push(c);
     }
 
     void unregister(final CoordinationImpl c, final boolean removeFromStack)
@@ -220,13 +250,13 @@
         return this.owner;
     }
 
-	public Coordination getEnclosingCoordination(final CoordinationImpl c)
+	Coordination getEnclosingCoordination(final CoordinationImpl c)
 	{
 		return mgr.getEnclosingCoordination(c);
 	}
 
-	public void endNestedCoordinations(final CoordinationImpl c)
+	boolean endNestedCoordinations(final CoordinationImpl c)
 	{
-		this.mgr.endNestedCoordinations(c);
+		return this.mgr.endNestedCoordinations(c);
 	}
 }