FELIX-2647 : Implement Coordinator Service - further fixes for the CT tests cases, up from 18 to 45 of 61 passing
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1550692 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
index 1da5428..3c65ece 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationImpl.java
@@ -33,14 +33,19 @@
public class CoordinationImpl implements Coordination
{
- /** Active */
- private static final int ACTIVE = 1;
+ private enum State {
+ /** Active */
+ ACTIVE,
- /** Coordination termination started */
- private static final int TERMINATING = 2;
+ /** failed() called */
+ FAILED,
- /** Coordination completed */
- private static final int TERMINATED = 3;
+ /** Coordination termination started */
+ TERMINATING,
+
+ /** Coordination completed */
+ TERMINATED
+ }
private final CoordinatorImpl owner;
@@ -53,11 +58,11 @@
/**
* Access to this field must be synchronized as long as the expected state
- * is {@link #ACTIVE}. Once the state has changed, further updates to this
+ * is {@link State#ACTIVE}. Once the state has changed, further updates to this
* instance will not take place any more and the state will only be modified
- * by the thread successfully setting the state to {@link #TERMINATING}.
+ * by the thread successfully setting the state to {@link State#TERMINATING}.
*/
- private volatile int state;
+ private volatile State state;
private Throwable failReason;
@@ -67,7 +72,7 @@
private TimerTask timeoutTask;
- private Thread initiatorThread;
+ private Thread associatedThread;
public CoordinationImpl(final CoordinatorImpl owner, final long id, final String name, final long timeOutInMs)
{
@@ -76,11 +81,10 @@
this.owner = owner;
this.id = id;
this.name = name;
- this.state = ACTIVE;
+ this.state = State.ACTIVE;
this.participants = new ArrayList<Participant>();
this.variables = new HashMap<Class<?>, Object>();
this.deadLine = (timeOutInMs > 0) ? System.currentTimeMillis() + timeOutInMs : 0;
- this.initiatorThread = Thread.currentThread();
scheduleTimeout(deadLine);
}
@@ -97,9 +101,12 @@
public boolean fail(Throwable reason)
{
+ if ( reason == null)
+ {
+ throw new IllegalArgumentException("Reason must not be null");
+ }
if (startTermination())
{
- this.owner.unregister(this);
this.failReason = reason;
// consider failure reason (if not null)
@@ -119,7 +126,8 @@
owner.releaseParticipant(part);
}
- state = TERMINATED;
+ this.owner.unregister(this, false);
+ state = State.FAILED;
synchronized (this)
{
@@ -133,11 +141,11 @@
public void end()
{
-
if (startTermination())
{
+ // TODO check for WRONG_THREAD
this.owner.endNestedCoordinations(this);
- this.owner.unregister(this);
+ this.owner.unregister(this, true);
boolean partialFailure = false;
for (int i=participants.size()-1;i>=0;i--)
@@ -157,7 +165,7 @@
owner.releaseParticipant(part);
}
- state = TERMINATED;
+ state = State.TERMINATED;
synchronized (this)
{
@@ -170,6 +178,12 @@
CoordinationException.PARTIALLY_ENDED);
}
}
+ else if ( state == State.FAILED )
+ {
+ this.owner.unregister(this, true);
+ state = State.TERMINATED;
+ throw new CoordinationException("Coordination failed", this, CoordinationException.FAILED, failReason);
+ }
else
{
// already terminated
@@ -185,7 +199,7 @@
// while we create a copy of the participant list
synchronized (this)
{
- if (state == ACTIVE)
+ if (state == State.ACTIVE)
{
return new ArrayList<Participant>(participants);
}
@@ -273,12 +287,12 @@
*/
public boolean isTerminated()
{
- return state != ACTIVE;
+ return state != State.ACTIVE;
}
public Thread getThread()
{
- return initiatorThread;
+ return associatedThread;
}
public void join(long timeoutInMillis) throws InterruptedException
@@ -335,9 +349,9 @@
*/
private synchronized boolean startTermination()
{
- if (state == ACTIVE)
+ if (state == State.ACTIVE)
{
- state = TERMINATING;
+ state = State.TERMINATING;
scheduleTimeout(-1);
return true;
}
@@ -387,7 +401,7 @@
}
@Override
- public int hashCode()
+ public int hashCode()
{
final int prime = 31;
int result = 1;
@@ -396,7 +410,7 @@
}
@Override
- public boolean equals(final Object obj)
+ public boolean equals(final Object obj)
{
if (this == obj)
return true;
@@ -409,4 +423,8 @@
return false;
return true;
}
+
+ void setAssociatedThread(final Thread t) {
+ this.associatedThread = t;
+ }
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
index 5cd66ec..7e1dd34 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinationMgr.java
@@ -23,6 +23,8 @@
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.Timer;
@@ -33,6 +35,7 @@
import javax.management.openmbean.TabularData;
import org.apache.felix.jmx.service.coordinator.CoordinatorMBean;
+import org.osgi.framework.Bundle;
import org.osgi.service.coordinator.Coordination;
import org.osgi.service.coordinator.CoordinationException;
import org.osgi.service.coordinator.Participant;
@@ -52,6 +55,8 @@
public class CoordinationMgr implements CoordinatorMBean
{
+ // TODO - sync access to coordinations
+
private ThreadLocal<Stack<Coordination>> threadStacks;
private final AtomicLong ctr;
@@ -135,7 +140,7 @@
CoordinationImpl current = participants.get(p);
while (current != null && current != c)
{
- if (current.getThread() == c.getThread())
+ if (current.getThread() != null && current.getThread() == c.getThread())
{
throw new CoordinationException("Participant " + p + " already participating in Coordination "
+ current.getId() + "/" + current.getName() + " in this thread", c,
@@ -188,17 +193,20 @@
return c;
}
- void unregister(final CoordinationImpl c)
+ void unregister(final CoordinationImpl c, final boolean removeFromThread)
{
coordinations.remove(c.getId());
- Stack<Coordination> stack = threadStacks.get();
- if (stack != null)
+ if ( removeFromThread )
{
- stack.remove(c);
+ Stack<Coordination> stack = threadStacks.get();
+ if (stack != null)
+ {
+ stack.remove(c);
+ }
}
}
- Coordination push(Coordination c)
+ Coordination push(final CoordinationImpl c)
{
Stack<Coordination> stack = threadStacks.get();
if (stack == null)
@@ -208,11 +216,13 @@
}
else
{
- if ( stack.contains(c) )
+ if ( stack.contains(c) )
{
throw new CoordinationException("Coordination already pushed", c, CoordinationException.ALREADY_PUSHED);
}
}
+
+ c.setAssociatedThread(Thread.currentThread());
return stack.push(c);
}
@@ -221,7 +231,11 @@
Stack<Coordination> stack = threadStacks.get();
if (stack != null && !stack.isEmpty())
{
- return stack.pop();
+ final CoordinationImpl c = (CoordinationImpl)stack.pop();
+ if ( c != null ) {
+ c.setAssociatedThread(null);
+ }
+ return c;
}
return null;
}
@@ -238,11 +252,10 @@
Collection<Coordination> getCoordinations()
{
- ArrayList<Coordination> result = new ArrayList<Coordination>();
- Stack<Coordination> stack = threadStacks.get();
- if (stack != null)
+ final ArrayList<Coordination> result = new ArrayList<Coordination>();
+ synchronized ( this.coordinations )
{
- result.addAll(stack);
+ result.addAll(this.coordinations.values());
}
return result;
}
@@ -326,7 +339,7 @@
}
*/
- public Coordination getEnclosingCoordination(final CoordinationImpl c)
+ public Coordination getEnclosingCoordination(final CoordinationImpl c)
{
Stack<Coordination> stack = threadStacks.get();
if ( stack != null )
@@ -340,7 +353,7 @@
return null;
}
- public void endNestedCoordinations(final CoordinationImpl c)
+ public void endNestedCoordinations(final CoordinationImpl c)
{
Stack<Coordination> stack = threadStacks.get();
if ( stack != null )
@@ -349,12 +362,50 @@
if ( index > 0 && stack.size() > index )
{
final int count = stack.size()-index;
- for(int i=0;i<count;i++)
+ for(int i=0;i<count;i++)
{
- final Coordination nested = (Coordination)stack.pop();
+ final Coordination nested = stack.pop();
nested.end();
}
}
}
}
+
+ /**
+ * Dispose all coordinations for that bundle
+ * @param owner The owner bundle
+ */
+ public void dispose(final Bundle owner) {
+ final List<CoordinationImpl> candidates = new ArrayList<CoordinationImpl>();
+ synchronized ( this.coordinations )
+ {
+ final Iterator<Map.Entry<Long, CoordinationImpl>> iter = this.coordinations.entrySet().iterator();
+ while ( iter.hasNext() )
+ {
+ final Map.Entry<Long, CoordinationImpl> entry = iter.next();
+ if ( entry.getValue().getBundle().getBundleId() == owner.getBundleId() )
+ {
+ candidates.add(entry.getValue());
+ }
+ }
+ }
+ if ( candidates.size() > 0 )
+ {
+ for(final CoordinationImpl c : candidates)
+ {
+ try {
+ if ( !c.isTerminated() )
+ {
+ c.fail(Coordination.RELEASED);
+ }
+ else
+ {
+ this.unregister(c, true);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
index 504b764..b280c36 100644
--- a/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordinator/impl/CoordinatorImpl.java
@@ -19,29 +19,25 @@
package org.apache.felix.coordinator.impl;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
import java.util.TimerTask;
import org.osgi.framework.Bundle;
import org.osgi.service.coordinator.Coordination;
import org.osgi.service.coordinator.CoordinationException;
+import org.osgi.service.coordinator.Coordinator;
import org.osgi.service.coordinator.Participant;
-public class CoordinatorImpl implements org.osgi.service.coordinator.Coordinator
+public class CoordinatorImpl implements Coordinator
{
private final Bundle owner;
private final CoordinationMgr mgr;
- private final Set<Coordination> coordinations;
-
CoordinatorImpl(final Bundle owner, final CoordinationMgr mgr)
{
this.owner = owner;
this.mgr = mgr;
- this.coordinations = new HashSet<Coordination>();
}
/**
@@ -55,28 +51,7 @@
*/
void dispose()
{
- final Coordination[] active;
- synchronized (coordinations)
- {
- if (coordinations.isEmpty())
- {
- active = null;
- }
- else
- {
- active = coordinations.toArray(new CoordinationImpl[coordinations.size()]);
- coordinations.clear();
- }
- }
-
- if (active != null)
- {
- Throwable reason = new Exception("Coordinator service released");
- for (int i = active.length -1; i >=0; i--)
- {
- active[i].fail(reason);
- }
- }
+ this.mgr.dispose(this.owner);
}
/**
@@ -110,7 +85,7 @@
for(final String p : parts)
{
boolean valid = true;
- if ( p.length() == 0 )
+ if ( p.length() == 0 )
{
valid = false;
}
@@ -135,30 +110,27 @@
break;
}
}
- if ( !valid )
+ if ( !valid )
{
- throw new IllegalArgumentException( "Name [" + name + "] does not comply with the symbolic-name definition." );
+ throw new IllegalArgumentException( "Name [" + name + "] does not comply with the symbolic-name definition." );
}
}
}
-
+
public Coordination create(final String name, final long timeout)
{
// TODO: check permission
// check arguments
checkName(name);
- if ( timeout < 0 )
+ if ( timeout < 0 )
{
throw new IllegalArgumentException("Timeout must not be negative");
}
-
+
// create coordination
final Coordination c = mgr.create(this, name, timeout);
- synchronized (coordinations)
- {
- coordinations.add(c);
- }
+
return c;
}
@@ -188,7 +160,7 @@
public Coordination begin(final String name, final long timeoutInMillis)
{
// TODO: check permission
- return push(create(name, timeoutInMillis));
+ return push((CoordinationImpl)create(name, timeoutInMillis));
}
public Coordination pop()
@@ -217,19 +189,15 @@
//----------
- Coordination push(Coordination c)
+ Coordination push(final CoordinationImpl c)
{
// TODO: check permission
return mgr.push(c);
}
- void unregister(final CoordinationImpl c)
+ void unregister(final CoordinationImpl c, final boolean removeFromStack)
{
- mgr.unregister(c);
- synchronized (coordinations)
- {
- coordinations.remove(c);
- }
+ mgr.unregister(c, removeFromStack);
}
void schedule(final TimerTask task, final long deadLine)
@@ -252,13 +220,13 @@
return this.owner;
}
- public Coordination getEnclosingCoordination(final CoordinationImpl c)
+ public Coordination getEnclosingCoordination(final CoordinationImpl c)
{
return mgr.getEnclosingCoordination(c);
}
- public void endNestedCoordinations(final CoordinationImpl c)
+ public void endNestedCoordinations(final CoordinationImpl c)
{
- this.mgr.endNestedCoordinations(c);
+ this.mgr.endNestedCoordinations(c);
}
}
diff --git a/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java b/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java
index 21433e1..d2bf362 100644
--- a/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java
+++ b/coordinator/src/test/java/org/apache/felix/coordinator/impl/CoordinatorImplTest.java
@@ -61,12 +61,12 @@
try
{
c1.end();
- fail("Expected CoordinationException.ALREADY_ENDED on end() after fail()");
+ fail("Expected CoordinationException.FAILED on end() after fail()");
}
catch (CoordinationException ce)
{
- // expected already terminated
- assertEquals(CoordinationException.ALREADY_ENDED, ce.getType());
+ // expected failed
+ assertEquals(CoordinationException.FAILED, ce.getType());
}
final Coordination c2 = coordinator.create(name, 0);
@@ -106,7 +106,7 @@
assertEquals(c1, coordinator.pop());
assertNull(coordinator.peek());
- coordinator.push(c1);
+ coordinator.push((CoordinationImpl)c1);
assertEquals(c1, coordinator.peek());
c1.end();
@@ -116,7 +116,14 @@
assertNotNull(c2);
assertEquals(name, c2.getName());
assertEquals(c2, coordinator.peek());
- c2.fail(null);
+ c2.fail(new Exception());
+ assertNotNull(coordinator.peek());
+ try {
+ c2.end();
+ fail("Exception should be thrown");
+ } catch (CoordinationException ce) {
+ // ignore
+ }
assertNull(coordinator.peek());
}
@@ -231,7 +238,7 @@
assertTrue(c1.getParticipants().contains(p1));
assertEquals(1, c1.getParticipants().size());
- c1.fail(null);
+ c1.fail(new Exception());
assertFalse(p1.ended);
assertTrue(p1.failed);
assertEquals(c1, p1.c);
@@ -246,7 +253,7 @@
assertTrue(c2.getParticipants().contains(p22));
assertEquals(2, c2.getParticipants().size());
- c2.fail(null);
+ c2.fail(new Exception());
assertTrue(p21.failed);
assertEquals(c2, p21.c);
assertTrue(p22.failed);
@@ -264,7 +271,7 @@
assertTrue(c3.getParticipants().contains(p32));
assertEquals(2, c3.getParticipants().size());
- c3.fail(null);
+ c3.fail(new Exception());
assertTrue(p31.failed);
assertEquals(c3, p31.c);
assertTrue(p32.failed);