FELIX-2671 Coordinations which are still active when the Coordinator
service is ungotten must be failed
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1027833 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
index f20129f..004b1eb 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
@@ -91,7 +91,7 @@
public void ungetService(Bundle bundle,
ServiceRegistration registration, Object service) {
- // TODO: FELIX-2671/OSGi Bug 104: Ensure all coordinations of this Coordinator service are terminated
+ ((CoordinatorImpl) service).dispose();
}
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
index 3985486..8db042f 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
@@ -43,7 +43,7 @@
/** Coordination failed */
private static final int FAILED = 4;
- private final CoordinationMgr mgr;
+ private final CoordinatorImpl owner;
private final long id;
@@ -72,9 +72,9 @@
private Thread initiatorThread;
- public CoordinationImpl(final CoordinationMgr mgr, final long id,
+ public CoordinationImpl(final CoordinatorImpl owner, final long id,
final String name, final long defaultTimeOutInMs) {
- this.mgr = mgr;
+ this.owner = owner;
this.id = id;
this.name = name;
this.mustFail = 0;
@@ -202,7 +202,7 @@
// ensure participant only pariticipates on a single coordination
// this blocks until the participant can participate or until
// a timeout occurrs (or a deadlock is detected)
- mgr.lockParticipant(p, this);
+ owner.lockParticipant(p, this);
// synchronize access to the state to prevent it from being changed
// while adding the participant
@@ -248,7 +248,7 @@
private synchronized boolean startTermination() {
if (state == ACTIVE) {
state = TERMINATING;
- mgr.unregister(this);
+ owner.unregister(this);
scheduleTimeout(-1);
return true;
}
@@ -278,7 +278,7 @@
}
// release the participant for other coordinations
- mgr.releaseParticipant(part);
+ owner.releaseParticipant(part);
}
state = TERMINATED;
return reason;
@@ -301,7 +301,7 @@
}
// release the participant for other coordinations
- mgr.releaseParticipant(part);
+ owner.releaseParticipant(part);
}
state = FAILED;
}
@@ -315,7 +315,7 @@
*/
private void scheduleTimeout(final long timeout) {
if (timeoutTask != null) {
- mgr.schedule(timeoutTask, -1);
+ owner.schedule(timeoutTask, -1);
timeoutTask = null;
}
@@ -327,7 +327,7 @@
}
};
- mgr.schedule(timeoutTask, timeout);
+ owner.schedule(timeoutTask, timeout);
}
}
}
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
index b67c99e..011bf97 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
@@ -87,14 +87,6 @@
coordinationTimer = new Timer("Coordination Timer", true);
}
- void unregister(final CoordinationImpl c) {
- coordinations.remove(c.getId());
- Stack<Coordination> stack = threadStacks.get();
- if (stack != null) {
- stack.remove(c);
- }
- }
-
void cleanUp() {
// terminate coordination timeout timer
coordinationTimer.purge();
@@ -158,16 +150,20 @@
// ---------- Coordinator back end implementation
- Coordination create(String name) {
+ Coordination create(final CoordinatorImpl owner, final String name) {
long id = ctr.incrementAndGet();
- CoordinationImpl c = new CoordinationImpl(this, id, name,
+ CoordinationImpl c = new CoordinationImpl(owner, id, name,
defaultTimeOut);
coordinations.put(id, c);
return c;
}
- Coordination begin(String name) {
- return push(create(name));
+ void unregister(final CoordinationImpl c) {
+ coordinations.remove(c.getId());
+ Stack<Coordination> stack = threadStacks.get();
+ if (stack != null) {
+ stack.remove(c);
+ }
}
Coordination push(Coordination c) {
@@ -195,15 +191,6 @@
return null;
}
- boolean alwaysFail(Throwable reason) {
- CoordinationImpl current = (CoordinationImpl) getCurrentCoordination();
- if (current != null) {
- current.mustFail(reason);
- return true;
- }
- return false;
- }
-
Collection<Coordination> getCoordinations() {
ArrayList<Coordination> result = new ArrayList<Coordination>();
Stack<Coordination> stack = threadStacks.get();
@@ -213,24 +200,6 @@
return result;
}
- boolean participate(Participant participant) throws CoordinationException {
- Coordination current = getCurrentCoordination();
- if (current != null) {
- current.participate(participant);
- return true;
- }
- return false;
- }
-
- Coordination participateOrBegin(Participant ifActive) {
- Coordination current = getCurrentCoordination();
- if (current == null) {
- current = begin("implicit");
- }
- current.participate(ifActive);
- return current;
- }
-
// ---------- CoordinatorMBean interface
public TabularData listCoordinations(String regexFilter) {
diff --git a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java
index b792c1b..d966c1d 100644
--- a/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java
+++ b/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java
@@ -19,6 +19,9 @@
package org.apache.felix.coordination.impl;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.TimerTask;
+
import org.apache.felix.service.coordination.Coordination;
import org.apache.felix.service.coordination.CoordinationException;
import org.apache.felix.service.coordination.Coordinator;
@@ -32,19 +35,54 @@
private final CoordinationMgr mgr;
+ private final HashSet<Coordination> coordinations;
+
CoordinatorImpl(final Bundle owner, final CoordinationMgr mgr) {
this.owner = owner;
this.mgr = mgr;
+ this.coordinations = new HashSet<Coordination>();
+ }
+
+ /**
+ * Ensure all active Coordinations started by this CoordinatorImpl instance
+ * are terminated before the service is ungotten by the bundle.
+ * <p>
+ * Called by the Coordinator ServiceFactory when this CoordinatorImpl
+ * instance is not used any longer by the owner bundle.
+ *
+ * @see FELIX-2671/OSGi Bug 104
+ */
+ void dispose() {
+ final Coordination[] active;
+ synchronized (coordinations) {
+ if (coordinations.isEmpty()) {
+ active = null;
+ } else {
+ active = coordinations.toArray(new CoordinationImpl[coordinations.size()]);
+ coordinations.clear();
+ }
+ }
+
+ if (active != null) {
+ Throwable reason = new Exception("Coordinator service released");
+ for (int i = 0; i < active.length; i++) {
+ active[i].fail(reason);
+ }
+ }
}
public Coordination create(String name) {
// TODO: check permission
- return mgr.create(name);
+ Coordination c = mgr.create(this, name);
+ synchronized (coordinations) {
+ coordinations.add(c);
+ }
+ return c;
}
public Coordination begin(String name) {
// TODO: check permission
- return mgr.begin(name);
+ return push(create(name));
}
public Coordination push(Coordination c) {
@@ -64,7 +102,12 @@
public boolean alwaysFail(Throwable reason) {
// TODO: check permission
- return mgr.alwaysFail(reason);
+ CoordinationImpl current = (CoordinationImpl) getCurrentCoordination();
+ if (current != null) {
+ current.mustFail(reason);
+ return true;
+ }
+ return false;
}
public Collection<Coordination> getCoordinations() {
@@ -75,7 +118,30 @@
public boolean participate(Participant participant)
throws CoordinationException {
// TODO: check permission
- return mgr.participate(participant);
+ Coordination current = getCurrentCoordination();
+ if (current != null) {
+ current.participate(participant);
+ return true;
+ }
+ return false;
}
+ void unregister(final CoordinationImpl c) {
+ mgr.unregister(c);
+ synchronized (coordinations) {
+ coordinations.remove(c);
+ }
+ }
+
+ void schedule(final TimerTask task, final long delay) {
+ mgr.schedule(task, delay);
+ }
+
+ void lockParticipant(final Participant p, final CoordinationImpl c) {
+ mgr.lockParticipant(p, c);
+ }
+
+ void releaseParticipant(final Participant p) {
+ mgr.releaseParticipant(p);
+ }
}