FELIX-2647 : Implement Coordinator Service - further fixes for the CT tests cases, 56 of 61 passing (implemented orphaned conditions)

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1551135 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
index 0b14a71..bd9ad7f 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
@@ -492,4 +492,14 @@
 	void setAssociatedThread(final Thread t) {
 	    this.associatedThread = t;
 	}
+
+    @Override
+    protected void finalize() throws Throwable {
+        if ( !this.isTerminated() )
+        {
+            this.fail(Coordination.ORPHANED);
+        }
+        super.finalize();
+    }
+
 }
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
index add4096..e3225e9 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
@@ -19,6 +19,7 @@
 package org.apache.felix.coordinator.impl;
 
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -56,13 +57,13 @@
 public class CoordinationMgr implements CoordinatorMBean
 {
 
-    private ThreadLocal<Stack<Coordination>> threadStacks;
+    private ThreadLocal<Stack<CoordinationImpl>> perThreadStack;
 
     private final AtomicLong ctr;
 
-    private final Map<Long, CoordinationImpl> coordinations;
+    private final Map<Long, WeakReference<CoordinationImpl>> coordinations;
 
-    private final Map<Participant, CoordinationImpl> participants;
+    private final Map<Participant, WeakReference<CoordinationImpl>> participants;
 
     private final Timer coordinationTimer;
 
@@ -83,10 +84,10 @@
 
     CoordinationMgr()
     {
-        threadStacks = new ThreadLocal<Stack<Coordination>>();
+        perThreadStack = new ThreadLocal<Stack<CoordinationImpl>>();
         ctr = new AtomicLong(-1);
-        coordinations = new HashMap<Long, CoordinationImpl>();
-        participants = new IdentityHashMap<Participant, CoordinationImpl>();
+        coordinations = new HashMap<Long, WeakReference<CoordinationImpl>>();
+        participants = new IdentityHashMap<Participant, WeakReference<CoordinationImpl>>();
         coordinationTimer = new Timer("Coordination Timer", true);
     }
 
@@ -97,14 +98,15 @@
         coordinationTimer.cancel();
 
         // terminate all active coordinations
-        final List<Coordination> coords = new ArrayList<Coordination>();
+        final List<WeakReference<CoordinationImpl>> refs = new ArrayList<WeakReference<CoordinationImpl>>();
         synchronized ( this.coordinations ) {
-            coords.addAll(this.coordinations.values());
+            refs.addAll(this.coordinations.values());
             this.coordinations.clear();
         }
-        for(final Coordination c : coords)
+        for(final WeakReference<CoordinationImpl> r : refs)
         {
-            if ( !c.isTerminated() )
+            final Coordination c = r.get();
+            if ( c != null && !c.isTerminated() )
             {
                 c.fail(Coordination.RELEASED);
             }
@@ -117,7 +119,22 @@
         }
 
         // cannot really clear out the thread local but we can let it go
-        threadStacks = null;
+        perThreadStack = null;
+    }
+
+    private Stack<CoordinationImpl> getThreadStack(final boolean create)
+    {
+        final ThreadLocal<Stack<CoordinationImpl>> tl = this.perThreadStack;
+        Stack<CoordinationImpl> stack = null;
+        if ( tl != null )
+        {
+            stack = tl.get();
+            if ( stack == null && create ) {
+                stack = new Stack<CoordinationImpl>();
+                tl.set(stack);
+            }
+        }
+        return stack;
     }
 
     void configure(final long coordinationTimeout, final long participationTimeout)
@@ -146,7 +163,16 @@
             long cutOff = System.currentTimeMillis() + participationTimeOut;
             long waitTime = (participationTimeOut > 500) ? participationTimeOut / 500 : participationTimeOut;
             // TODO - the above wait time looks wrong e.g. if it's 800, the wait time 1ms
-            CoordinationImpl current = participants.get(p);
+            WeakReference<CoordinationImpl> currentRef = participants.get(p);
+            CoordinationImpl current = null;
+            if ( currentRef != null )
+            {
+                current = currentRef.get();
+                if ( current == null )
+                {
+                    participants.remove(p);
+                }
+            }
             while (current != null && current != c)
             {
                 if (current.getThread() != null && current.getThread() == c.getThread())
@@ -175,11 +201,20 @@
                 }
 
                 // check again
-                current = participants.get(p);
+                current = null;
+                currentRef = participants.get(p);
+                if ( currentRef != null )
+                {
+                    current = currentRef.get();
+                    if ( current == null )
+                    {
+                        participants.remove(p);
+                    }
+                }
             }
 
             // lock participant into coordination
-            participants.put(p, c);
+            participants.put(p, new WeakReference<CoordinationImpl>(c));
         }
     }
 
@@ -200,7 +235,7 @@
         final CoordinationImpl c = new CoordinationImpl(owner, id, name, timeout);
         synchronized ( this.coordinations )
         {
-            coordinations.put(id, c);
+            coordinations.put(id, new WeakReference<CoordinationImpl>(c));
         }
         return c;
     }
@@ -213,7 +248,7 @@
         }
         if ( removeFromThread )
         {
-            Stack<Coordination> stack = threadStacks.get();
+            final Stack<CoordinationImpl> stack = this.getThreadStack(false);
             if (stack != null)
             {
                 stack.remove(c);
@@ -223,30 +258,24 @@
 
     void push(final CoordinationImpl c)
     {
-        Stack<Coordination> stack = threadStacks.get();
-        if (stack == null)
-        {
-            stack = new Stack<Coordination>();
-            threadStacks.set(stack);
-        }
-        else
+        Stack<CoordinationImpl> stack = this.getThreadStack(true);
+        if ( stack != null)
         {
             if ( stack.contains(c) )
             {
                 throw new CoordinationException("Coordination already pushed", c, CoordinationException.ALREADY_PUSHED);
             }
+            c.setAssociatedThread(Thread.currentThread());
+            stack.push(c);
         }
-
-        c.setAssociatedThread(Thread.currentThread());
-        stack.push(c);
     }
 
     Coordination pop()
     {
-        Stack<Coordination> stack = threadStacks.get();
+        final Stack<CoordinationImpl> stack = this.getThreadStack(false);
         if (stack != null && !stack.isEmpty())
         {
-            final CoordinationImpl c = (CoordinationImpl)stack.pop();
+            final CoordinationImpl c = stack.pop();
             if ( c != null ) {
                 c.setAssociatedThread(null);
             }
@@ -257,7 +286,7 @@
 
     Coordination peek()
     {
-        Stack<Coordination> stack = threadStacks.get();
+        final Stack<CoordinationImpl> stack = this.getThreadStack(false);
         if (stack != null && !stack.isEmpty())
         {
             return stack.peek();
@@ -270,7 +299,14 @@
         final ArrayList<Coordination> result = new ArrayList<Coordination>();
         synchronized ( this.coordinations )
         {
-            result.addAll(this.coordinations.values());
+            for(final WeakReference<CoordinationImpl> ref : this.coordinations.values() )
+            {
+                final CoordinationImpl c = ref.get();
+                if ( c != null )
+                {
+                    result.add(c);
+                }
+            }
         }
         return result;
     }
@@ -279,7 +315,8 @@
     {
         synchronized ( this.coordinations )
         {
-            final CoordinationImpl c = coordinations.get(id);
+            final WeakReference<CoordinationImpl> ref = coordinations.get(id);
+            final CoordinationImpl c = (ref == null) ? null : ref.get();
             return (c == null || c.isTerminated()) ? null : c;
         }
     }
@@ -359,7 +396,7 @@
 
 	public Coordination getEnclosingCoordination(final CoordinationImpl c)
 	{
-        Stack<Coordination> stack = threadStacks.get();
+        final Stack<CoordinationImpl> stack = this.getThreadStack(false);
         if ( stack != null )
         {
         	final int index = stack.indexOf(c);
@@ -374,7 +411,7 @@
 	public boolean endNestedCoordinations(final CoordinationImpl c)
 	{
 	    boolean partiallyFailed = false;
-        Stack<Coordination> stack = threadStacks.get();
+        final Stack<CoordinationImpl> stack = this.getThreadStack(false);
         if ( stack != null )
         {
         	final int index = stack.indexOf(c) + 1;
@@ -383,10 +420,10 @@
         		final int count = stack.size()-index;
         		for(int i=0;i<count;i++)
         		{
-        			final Coordination nested = stack.pop();
+        			final CoordinationImpl nested = stack.pop();
         			try
         			{
-        			    nested.end();
+    			        nested.end();
         			}
         			catch ( final CoordinationException ce)
         			{
@@ -406,13 +443,14 @@
         final List<CoordinationImpl> candidates = new ArrayList<CoordinationImpl>();
         synchronized ( this.coordinations )
         {
-            final Iterator<Map.Entry<Long, CoordinationImpl>> iter = this.coordinations.entrySet().iterator();
+            final Iterator<Map.Entry<Long, WeakReference<CoordinationImpl>>> iter = this.coordinations.entrySet().iterator();
             while ( iter.hasNext() )
             {
-                final Map.Entry<Long, CoordinationImpl> entry = iter.next();
-                if ( entry.getValue().getBundle().getBundleId() == owner.getBundleId() )
+                final Map.Entry<Long, WeakReference<CoordinationImpl>> entry = iter.next();
+                final CoordinationImpl c = entry.getValue().get();
+                if ( c != null && c.getBundle().getBundleId() == owner.getBundleId() )
                 {
-                    candidates.add(entry.getValue());
+                    candidates.add(c);
                 }
             }
         }
@@ -420,7 +458,6 @@
         {
             for(final CoordinationImpl c : candidates)
             {
-                try {
                 if ( !c.isTerminated() )
                 {
                     c.fail(Coordination.RELEASED);
@@ -429,9 +466,6 @@
                 {
                     this.unregister(c, true);
                 }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
             }
         }
     }
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
index eb78b76..0de2654 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
@@ -212,7 +212,7 @@
     /**
      * @see org.osgi.service.coordinator.Coordinator#getCoordination(long)
      */
-    public Coordination getCoordination(long id)
+    public Coordination getCoordination(final long id)
     {
         // TODO: check permission
         return mgr.getCoordinationById(id);