FELIX-2647 : Implement Coordinator Service - further fixes for the CT tests cases, up from 18 to 45 of 61 passing

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1550692 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
index 1da5428..3c65ece 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
@@ -33,14 +33,19 @@
 public class CoordinationImpl implements Coordination
 {
 
-    /** Active */
-    private static final int ACTIVE = 1;
+    private enum State {
+        /** Active */
+        ACTIVE,
 
-    /** Coordination termination started */
-    private static final int TERMINATING = 2;
+        /** failed() called */
+        FAILED,
 
-    /** Coordination completed */
-    private static final int TERMINATED = 3;
+        /** Coordination termination started */
+        TERMINATING,
+
+        /** Coordination completed */
+        TERMINATED
+    }
 
     private final CoordinatorImpl owner;
 
@@ -53,11 +58,11 @@
 
     /**
      * Access to this field must be synchronized as long as the expected state
-     * is {@link #ACTIVE}. Once the state has changed, further updates to this
+     * is {@link State#ACTIVE}. Once the state has changed, further updates to this
      * instance will not take place any more and the state will only be modified
-     * by the thread successfully setting the state to {@link #TERMINATING}.
+     * by the thread successfully setting the state to {@link State#TERMINATING}.
      */
-    private volatile int state;
+    private volatile State state;
 
     private Throwable failReason;
 
@@ -67,7 +72,7 @@
 
     private TimerTask timeoutTask;
 
-    private Thread initiatorThread;
+    private Thread associatedThread;
 
     public CoordinationImpl(final CoordinatorImpl owner, final long id, final String name, final long timeOutInMs)
     {
@@ -76,11 +81,10 @@
         this.owner = owner;
         this.id = id;
         this.name = name;
-        this.state = ACTIVE;
+        this.state = State.ACTIVE;
         this.participants = new ArrayList<Participant>();
         this.variables = new HashMap<Class<?>, Object>();
         this.deadLine = (timeOutInMs > 0) ? System.currentTimeMillis() + timeOutInMs : 0;
-        this.initiatorThread = Thread.currentThread();
 
         scheduleTimeout(deadLine);
     }
@@ -97,9 +101,12 @@
 
     public boolean fail(Throwable reason)
     {
+        if ( reason == null)
+        {
+            throw new IllegalArgumentException("Reason must not be null");
+        }
         if (startTermination())
         {
-            this.owner.unregister(this);
             this.failReason = reason;
 
             // consider failure reason (if not null)
@@ -119,7 +126,8 @@
                 owner.releaseParticipant(part);
             }
 
-            state = TERMINATED;
+            this.owner.unregister(this, false);
+            state = State.FAILED;
 
             synchronized (this)
             {
@@ -133,11 +141,11 @@
 
     public void end()
     {
-    	
         if (startTermination())
         {
+            // TODO check for WRONG_THREAD
         	this.owner.endNestedCoordinations(this);
-            this.owner.unregister(this);
+            this.owner.unregister(this, true);
 
         	boolean partialFailure = false;
             for (int i=participants.size()-1;i>=0;i--)
@@ -157,7 +165,7 @@
                 owner.releaseParticipant(part);
             }
 
-            state = TERMINATED;
+            state = State.TERMINATED;
 
             synchronized (this)
             {
@@ -170,6 +178,12 @@
                     CoordinationException.PARTIALLY_ENDED);
             }
         }
+        else if ( state == State.FAILED )
+        {
+            this.owner.unregister(this, true);
+            state = State.TERMINATED;
+            throw new CoordinationException("Coordination failed", this, CoordinationException.FAILED, failReason);
+        }
         else
         {
             // already terminated
@@ -185,7 +199,7 @@
         // while we create a copy of the participant list
         synchronized (this)
         {
-            if (state == ACTIVE)
+            if (state == State.ACTIVE)
             {
                 return new ArrayList<Participant>(participants);
             }
@@ -273,12 +287,12 @@
      */
     public boolean isTerminated()
     {
-        return state != ACTIVE;
+        return state != State.ACTIVE;
     }
 
     public Thread getThread()
     {
-        return initiatorThread;
+        return associatedThread;
     }
 
     public void join(long timeoutInMillis) throws InterruptedException
@@ -335,9 +349,9 @@
      */
     private synchronized boolean startTermination()
     {
-        if (state == ACTIVE)
+        if (state == State.ACTIVE)
         {
-            state = TERMINATING;
+            state = State.TERMINATING;
             scheduleTimeout(-1);
             return true;
         }
@@ -387,7 +401,7 @@
     }
 
 	@Override
-	public int hashCode() 
+	public int hashCode()
 	{
 		final int prime = 31;
 		int result = 1;
@@ -396,7 +410,7 @@
 	}
 
 	@Override
-	public boolean equals(final Object obj) 
+	public boolean equals(final Object obj)
 	{
 		if (this == obj)
 			return true;
@@ -409,4 +423,8 @@
 			return false;
 		return true;
 	}
+
+	void setAssociatedThread(final Thread t) {
+	    this.associatedThread = t;
+	}
 }
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
index 5cd66ec..7e1dd34 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
@@ -23,6 +23,8 @@
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 import java.util.Timer;
@@ -33,6 +35,7 @@
 import javax.management.openmbean.TabularData;
 
 import org.apache.felix.jmx.service.coordinator.CoordinatorMBean;
+import org.osgi.framework.Bundle;
 import org.osgi.service.coordinator.Coordination;
 import org.osgi.service.coordinator.CoordinationException;
 import org.osgi.service.coordinator.Participant;
@@ -52,6 +55,8 @@
 public class CoordinationMgr implements CoordinatorMBean
 {
 
+    // TODO - sync access to coordinations
+
     private ThreadLocal<Stack<Coordination>> threadStacks;
 
     private final AtomicLong ctr;
@@ -135,7 +140,7 @@
             CoordinationImpl current = participants.get(p);
             while (current != null && current != c)
             {
-                if (current.getThread() == c.getThread())
+                if (current.getThread() != null && current.getThread() == c.getThread())
                 {
                     throw new CoordinationException("Participant " + p + " already participating in Coordination "
                         + current.getId() + "/" + current.getName() + " in this thread", c,
@@ -188,17 +193,20 @@
         return c;
     }
 
-    void unregister(final CoordinationImpl c)
+    void unregister(final CoordinationImpl c, final boolean removeFromThread)
     {
         coordinations.remove(c.getId());
-        Stack<Coordination> stack = threadStacks.get();
-        if (stack != null)
+        if ( removeFromThread )
         {
-            stack.remove(c);
+            Stack<Coordination> stack = threadStacks.get();
+            if (stack != null)
+            {
+                stack.remove(c);
+            }
         }
     }
 
-    Coordination push(Coordination c)
+    Coordination push(final CoordinationImpl c)
     {
         Stack<Coordination> stack = threadStacks.get();
         if (stack == null)
@@ -208,11 +216,13 @@
         }
         else
         {
-            if ( stack.contains(c) ) 
+            if ( stack.contains(c) )
             {
                 throw new CoordinationException("Coordination already pushed", c, CoordinationException.ALREADY_PUSHED);
             }
         }
+
+        c.setAssociatedThread(Thread.currentThread());
         return stack.push(c);
     }
 
@@ -221,7 +231,11 @@
         Stack<Coordination> stack = threadStacks.get();
         if (stack != null && !stack.isEmpty())
         {
-            return stack.pop();
+            final CoordinationImpl c = (CoordinationImpl)stack.pop();
+            if ( c != null ) {
+                c.setAssociatedThread(null);
+            }
+            return c;
         }
         return null;
     }
@@ -238,11 +252,10 @@
 
     Collection<Coordination> getCoordinations()
     {
-        ArrayList<Coordination> result = new ArrayList<Coordination>();
-        Stack<Coordination> stack = threadStacks.get();
-        if (stack != null)
+        final ArrayList<Coordination> result = new ArrayList<Coordination>();
+        synchronized ( this.coordinations )
         {
-            result.addAll(stack);
+            result.addAll(this.coordinations.values());
         }
         return result;
     }
@@ -326,7 +339,7 @@
     }
     */
 
-	public Coordination getEnclosingCoordination(final CoordinationImpl c) 
+	public Coordination getEnclosingCoordination(final CoordinationImpl c)
 	{
         Stack<Coordination> stack = threadStacks.get();
         if ( stack != null )
@@ -340,7 +353,7 @@
 		return null;
 	}
 
-	public void endNestedCoordinations(final CoordinationImpl c) 
+	public void endNestedCoordinations(final CoordinationImpl c)
 	{
         Stack<Coordination> stack = threadStacks.get();
         if ( stack != null )
@@ -349,12 +362,50 @@
         	if ( index > 0 && stack.size() > index )
         	{
         		final int count = stack.size()-index;
-        		for(int i=0;i<count;i++) 
+        		for(int i=0;i<count;i++)
         		{
-        			final Coordination nested = (Coordination)stack.pop();
+        			final Coordination nested = stack.pop();
         			nested.end();
         		}
         	}
         }
 	}
+
+	/**
+	 * Dispose all coordinations for that bundle
+	 * @param owner The owner bundle
+	 */
+    public void dispose(final Bundle owner) {
+        final List<CoordinationImpl> candidates = new ArrayList<CoordinationImpl>();
+        synchronized ( this.coordinations )
+        {
+            final Iterator<Map.Entry<Long, CoordinationImpl>> iter = this.coordinations.entrySet().iterator();
+            while ( iter.hasNext() )
+            {
+                final Map.Entry<Long, CoordinationImpl> entry = iter.next();
+                if ( entry.getValue().getBundle().getBundleId() == owner.getBundleId() )
+                {
+                    candidates.add(entry.getValue());
+                }
+            }
+        }
+        if ( candidates.size() > 0 )
+        {
+            for(final CoordinationImpl c : candidates)
+            {
+                try {
+                if ( !c.isTerminated() )
+                {
+                    c.fail(Coordination.RELEASED);
+                }
+                else
+                {
+                    this.unregister(c, true);
+                }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
 }
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
index 504b764..b280c36 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
@@ -19,29 +19,25 @@
 package org.apache.felix.coordinator.impl;
 
 import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.TimerTask;
 
 import org.osgi.framework.Bundle;
 import org.osgi.service.coordinator.Coordination;
 import org.osgi.service.coordinator.CoordinationException;
+import org.osgi.service.coordinator.Coordinator;
 import org.osgi.service.coordinator.Participant;
 
-public class CoordinatorImpl implements org.osgi.service.coordinator.Coordinator
+public class CoordinatorImpl implements Coordinator
 {
 
     private final Bundle owner;
 
     private final CoordinationMgr mgr;
 
-    private final Set<Coordination> coordinations;
-
     CoordinatorImpl(final Bundle owner, final CoordinationMgr mgr)
     {
         this.owner = owner;
         this.mgr = mgr;
-        this.coordinations = new HashSet<Coordination>();
     }
 
     /**
@@ -55,28 +51,7 @@
      */
     void dispose()
     {
-        final Coordination[] active;
-        synchronized (coordinations)
-        {
-            if (coordinations.isEmpty())
-            {
-                active = null;
-            }
-            else
-            {
-                active = coordinations.toArray(new CoordinationImpl[coordinations.size()]);
-                coordinations.clear();
-            }
-        }
-
-        if (active != null)
-        {
-            Throwable reason = new Exception("Coordinator service released");
-            for (int i = active.length -1; i >=0; i--)
-            {
-                active[i].fail(reason);
-            }
-        }
+        this.mgr.dispose(this.owner);
     }
 
     /**
@@ -110,7 +85,7 @@
         for(final String p : parts)
         {
         	boolean valid = true;
-        	if ( p.length() == 0 ) 
+        	if ( p.length() == 0 )
         	{
         		valid = false;
         	}
@@ -135,30 +110,27 @@
 	            	break;
 	            }
         	}
-        	if ( !valid ) 
+        	if ( !valid )
         	{
-                throw new IllegalArgumentException( "Name [" + name + "] does not comply with the symbolic-name definition." );        		        		
+                throw new IllegalArgumentException( "Name [" + name + "] does not comply with the symbolic-name definition." );
         	}
         }
     }
-    
+
     public Coordination create(final String name, final long timeout)
     {
         // TODO: check permission
 
     	// check arguments
     	checkName(name);
-    	if ( timeout < 0 ) 
+    	if ( timeout < 0 )
     	{
     		throw new IllegalArgumentException("Timeout must not be negative");
     	}
-    	
+
     	// create coordination
         final Coordination c = mgr.create(this, name, timeout);
-        synchronized (coordinations)
-        {
-            coordinations.add(c);
-        }
+
         return c;
     }
 
@@ -188,7 +160,7 @@
     public Coordination begin(final String name, final long timeoutInMillis)
     {
         // TODO: check permission
-        return push(create(name, timeoutInMillis));
+        return push((CoordinationImpl)create(name, timeoutInMillis));
     }
 
     public Coordination pop()
@@ -217,19 +189,15 @@
 
     //----------
 
-    Coordination push(Coordination c)
+    Coordination push(final CoordinationImpl c)
     {
         // TODO: check permission
         return mgr.push(c);
     }
 
-    void unregister(final CoordinationImpl c)
+    void unregister(final CoordinationImpl c, final boolean removeFromStack)
     {
-        mgr.unregister(c);
-        synchronized (coordinations)
-        {
-            coordinations.remove(c);
-        }
+        mgr.unregister(c, removeFromStack);
     }
 
     void schedule(final TimerTask task, final long deadLine)
@@ -252,13 +220,13 @@
         return this.owner;
     }
 
-	public Coordination getEnclosingCoordination(final CoordinationImpl c) 
+	public Coordination getEnclosingCoordination(final CoordinationImpl c)
 	{
 		return mgr.getEnclosingCoordination(c);
 	}
 
-	public void endNestedCoordinations(final CoordinationImpl c) 
+	public void endNestedCoordinations(final CoordinationImpl c)
 	{
-		this.mgr.endNestedCoordinations(c);		
+		this.mgr.endNestedCoordinations(c);
 	}
 }
diff --git a/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java b/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java
index 21433e1..d2bf362 100644
--- a/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java
+++ b/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java
@@ -61,12 +61,12 @@
         try
         {
             c1.end();
-            fail("Expected CoordinationException.ALREADY_ENDED on end() after fail()");
+            fail("Expected CoordinationException.FAILED on end() after fail()");
         }
         catch (CoordinationException ce)
         {
-            // expected already terminated
-            assertEquals(CoordinationException.ALREADY_ENDED, ce.getType());
+            // expected failed
+            assertEquals(CoordinationException.FAILED, ce.getType());
         }
 
         final Coordination c2 = coordinator.create(name, 0);
@@ -106,7 +106,7 @@
         assertEquals(c1, coordinator.pop());
 
         assertNull(coordinator.peek());
-        coordinator.push(c1);
+        coordinator.push((CoordinationImpl)c1);
         assertEquals(c1, coordinator.peek());
 
         c1.end();
@@ -116,7 +116,14 @@
         assertNotNull(c2);
         assertEquals(name, c2.getName());
         assertEquals(c2, coordinator.peek());
-        c2.fail(null);
+        c2.fail(new Exception());
+        assertNotNull(coordinator.peek());
+        try {
+            c2.end();
+            fail("Exception should be thrown");
+        } catch (CoordinationException ce) {
+            // ignore
+        }
         assertNull(coordinator.peek());
     }
 
@@ -231,7 +238,7 @@
         assertTrue(c1.getParticipants().contains(p1));
         assertEquals(1, c1.getParticipants().size());
 
-        c1.fail(null);
+        c1.fail(new Exception());
         assertFalse(p1.ended);
         assertTrue(p1.failed);
         assertEquals(c1, p1.c);
@@ -246,7 +253,7 @@
         assertTrue(c2.getParticipants().contains(p22));
         assertEquals(2, c2.getParticipants().size());
 
-        c2.fail(null);
+        c2.fail(new Exception());
         assertTrue(p21.failed);
         assertEquals(c2, p21.c);
         assertTrue(p22.failed);
@@ -264,7 +271,7 @@
         assertTrue(c3.getParticipants().contains(p32));
         assertEquals(2, c3.getParticipants().size());
 
-        c3.fail(null);
+        c3.fail(new Exception());
         assertTrue(p31.failed);
         assertEquals(c3, p31.c);
         assertTrue(p32.failed);