FELIX-2647 : Implement Coordinator Service - further fixes for the CT tests cases, 56 of 61 passing (implemented orphaned conditions)
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1551135 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
index 0b14a71..bd9ad7f 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
@@ -492,4 +492,14 @@
void setAssociatedThread(final Thread t) {
this.associatedThread = t;
}
+
+ @Override
+ protected void finalize() throws Throwable {
+ if ( !this.isTerminated() )
+ {
+ this.fail(Coordination.ORPHANED);
+ }
+ super.finalize();
+ }
+
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
index add4096..e3225e9 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
@@ -19,6 +19,7 @@
package org.apache.felix.coordinator.impl;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -56,13 +57,13 @@
public class CoordinationMgr implements CoordinatorMBean
{
- private ThreadLocal<Stack<Coordination>> threadStacks;
+ private ThreadLocal<Stack<CoordinationImpl>> perThreadStack;
private final AtomicLong ctr;
- private final Map<Long, CoordinationImpl> coordinations;
+ private final Map<Long, WeakReference<CoordinationImpl>> coordinations;
- private final Map<Participant, CoordinationImpl> participants;
+ private final Map<Participant, WeakReference<CoordinationImpl>> participants;
private final Timer coordinationTimer;
@@ -83,10 +84,10 @@
CoordinationMgr()
{
- threadStacks = new ThreadLocal<Stack<Coordination>>();
+ perThreadStack = new ThreadLocal<Stack<CoordinationImpl>>();
ctr = new AtomicLong(-1);
- coordinations = new HashMap<Long, CoordinationImpl>();
- participants = new IdentityHashMap<Participant, CoordinationImpl>();
+ coordinations = new HashMap<Long, WeakReference<CoordinationImpl>>();
+ participants = new IdentityHashMap<Participant, WeakReference<CoordinationImpl>>();
coordinationTimer = new Timer("Coordination Timer", true);
}
@@ -97,14 +98,15 @@
coordinationTimer.cancel();
// terminate all active coordinations
- final List<Coordination> coords = new ArrayList<Coordination>();
+ final List<WeakReference<CoordinationImpl>> refs = new ArrayList<WeakReference<CoordinationImpl>>();
synchronized ( this.coordinations ) {
- coords.addAll(this.coordinations.values());
+ refs.addAll(this.coordinations.values());
this.coordinations.clear();
}
- for(final Coordination c : coords)
+ for(final WeakReference<CoordinationImpl> r : refs)
{
- if ( !c.isTerminated() )
+ final Coordination c = r.get();
+ if ( c != null && !c.isTerminated() )
{
c.fail(Coordination.RELEASED);
}
@@ -117,7 +119,22 @@
}
// cannot really clear out the thread local but we can let it go
- threadStacks = null;
+ perThreadStack = null;
+ }
+
+ private Stack<CoordinationImpl> getThreadStack(final boolean create)
+ {
+ final ThreadLocal<Stack<CoordinationImpl>> tl = this.perThreadStack;
+ Stack<CoordinationImpl> stack = null;
+ if ( tl != null )
+ {
+ stack = tl.get();
+ if ( stack == null && create ) {
+ stack = new Stack<CoordinationImpl>();
+ tl.set(stack);
+ }
+ }
+ return stack;
}
void configure(final long coordinationTimeout, final long participationTimeout)
@@ -146,7 +163,16 @@
long cutOff = System.currentTimeMillis() + participationTimeOut;
long waitTime = (participationTimeOut > 500) ? participationTimeOut / 500 : participationTimeOut;
// TODO - the above wait time looks wrong e.g. if it's 800, the wait time 1ms
- CoordinationImpl current = participants.get(p);
+ WeakReference<CoordinationImpl> currentRef = participants.get(p);
+ CoordinationImpl current = null;
+ if ( currentRef != null )
+ {
+ current = currentRef.get();
+ if ( current == null )
+ {
+ participants.remove(p);
+ }
+ }
while (current != null && current != c)
{
if (current.getThread() != null && current.getThread() == c.getThread())
@@ -175,11 +201,20 @@
}
// check again
- current = participants.get(p);
+ current = null;
+ currentRef = participants.get(p);
+ if ( currentRef != null )
+ {
+ current = currentRef.get();
+ if ( current == null )
+ {
+ participants.remove(p);
+ }
+ }
}
// lock participant into coordination
- participants.put(p, c);
+ participants.put(p, new WeakReference<CoordinationImpl>(c));
}
}
@@ -200,7 +235,7 @@
final CoordinationImpl c = new CoordinationImpl(owner, id, name, timeout);
synchronized ( this.coordinations )
{
- coordinations.put(id, c);
+ coordinations.put(id, new WeakReference<CoordinationImpl>(c));
}
return c;
}
@@ -213,7 +248,7 @@
}
if ( removeFromThread )
{
- Stack<Coordination> stack = threadStacks.get();
+ final Stack<CoordinationImpl> stack = this.getThreadStack(false);
if (stack != null)
{
stack.remove(c);
@@ -223,30 +258,24 @@
void push(final CoordinationImpl c)
{
- Stack<Coordination> stack = threadStacks.get();
- if (stack == null)
- {
- stack = new Stack<Coordination>();
- threadStacks.set(stack);
- }
- else
+ Stack<CoordinationImpl> stack = this.getThreadStack(true);
+ if ( stack != null)
{
if ( stack.contains(c) )
{
throw new CoordinationException("Coordination already pushed", c, CoordinationException.ALREADY_PUSHED);
}
+ c.setAssociatedThread(Thread.currentThread());
+ stack.push(c);
}
-
- c.setAssociatedThread(Thread.currentThread());
- stack.push(c);
}
Coordination pop()
{
- Stack<Coordination> stack = threadStacks.get();
+ final Stack<CoordinationImpl> stack = this.getThreadStack(false);
if (stack != null && !stack.isEmpty())
{
- final CoordinationImpl c = (CoordinationImpl)stack.pop();
+ final CoordinationImpl c = stack.pop();
if ( c != null ) {
c.setAssociatedThread(null);
}
@@ -257,7 +286,7 @@
Coordination peek()
{
- Stack<Coordination> stack = threadStacks.get();
+ final Stack<CoordinationImpl> stack = this.getThreadStack(false);
if (stack != null && !stack.isEmpty())
{
return stack.peek();
@@ -270,7 +299,14 @@
final ArrayList<Coordination> result = new ArrayList<Coordination>();
synchronized ( this.coordinations )
{
- result.addAll(this.coordinations.values());
+ for(final WeakReference<CoordinationImpl> ref : this.coordinations.values() )
+ {
+ final CoordinationImpl c = ref.get();
+ if ( c != null )
+ {
+ result.add(c);
+ }
+ }
}
return result;
}
@@ -279,7 +315,8 @@
{
synchronized ( this.coordinations )
{
- final CoordinationImpl c = coordinations.get(id);
+ final WeakReference<CoordinationImpl> ref = coordinations.get(id);
+ final CoordinationImpl c = (ref == null) ? null : ref.get();
return (c == null || c.isTerminated()) ? null : c;
}
}
@@ -359,7 +396,7 @@
public Coordination getEnclosingCoordination(final CoordinationImpl c)
{
- Stack<Coordination> stack = threadStacks.get();
+ final Stack<CoordinationImpl> stack = this.getThreadStack(false);
if ( stack != null )
{
final int index = stack.indexOf(c);
@@ -374,7 +411,7 @@
public boolean endNestedCoordinations(final CoordinationImpl c)
{
boolean partiallyFailed = false;
- Stack<Coordination> stack = threadStacks.get();
+ final Stack<CoordinationImpl> stack = this.getThreadStack(false);
if ( stack != null )
{
final int index = stack.indexOf(c) + 1;
@@ -383,10 +420,10 @@
final int count = stack.size()-index;
for(int i=0;i<count;i++)
{
- final Coordination nested = stack.pop();
+ final CoordinationImpl nested = stack.pop();
try
{
- nested.end();
+ nested.end();
}
catch ( final CoordinationException ce)
{
@@ -406,13 +443,14 @@
final List<CoordinationImpl> candidates = new ArrayList<CoordinationImpl>();
synchronized ( this.coordinations )
{
- final Iterator<Map.Entry<Long, CoordinationImpl>> iter = this.coordinations.entrySet().iterator();
+ final Iterator<Map.Entry<Long, WeakReference<CoordinationImpl>>> iter = this.coordinations.entrySet().iterator();
while ( iter.hasNext() )
{
- final Map.Entry<Long, CoordinationImpl> entry = iter.next();
- if ( entry.getValue().getBundle().getBundleId() == owner.getBundleId() )
+ final Map.Entry<Long, WeakReference<CoordinationImpl>> entry = iter.next();
+ final CoordinationImpl c = entry.getValue().get();
+ if ( c != null && c.getBundle().getBundleId() == owner.getBundleId() )
{
- candidates.add(entry.getValue());
+ candidates.add(c);
}
}
}
@@ -420,7 +458,6 @@
{
for(final CoordinationImpl c : candidates)
{
- try {
if ( !c.isTerminated() )
{
c.fail(Coordination.RELEASED);
@@ -429,9 +466,6 @@
{
this.unregister(c, true);
}
- } catch (Exception e) {
- e.printStackTrace();
- }
}
}
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
index eb78b76..0de2654 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
@@ -212,7 +212,7 @@
/**
* @see org.osgi.service.coordinator.Coordinator#getCoordination(long)
*/
- public Coordination getCoordination(long id)
+ public Coordination getCoordination(final long id)
{
// TODO: check permission
return mgr.getCoordinationById(id);