Finalized this test, which is reproducing some problems inside ConfigurationDependencyImpl.
Also, this test seems to reproduce a NPE inside config admin (see FELIX-4385).
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1558377 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java b/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java
index 96e6dd6..1096b00 100644
--- a/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java
+++ b/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.felix.dm.test.integration.api;
+import java.io.IOException;
+import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -38,17 +40,21 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
/**
- * FELIX-3910: Another race test for concurrent service registration/unregistration.
+ * Another race test for concurrent service registrations/unregistrations.
+ * Aspects are also depending on some configuration pids, which are also registered/unregistered concurrently.
*/
@RunWith(PaxExam.class)
public class ServiceRaceTest extends TestBase {
- final static int SERVICES = 10;
+ final static int STEP_WAIT = 10000;
+ final static int SERVICES = 3;
final static int INVOKES = 10;
- volatile ExecutorService m_execRegister; // used to register/unregister S services
- volatile ExecutorService m_execInvoke; // Used by Client to invoke S services
+ volatile ExecutorService m_execServices; // used to register/unregister S services
+ volatile ExecutorService m_execAspects; // used to register/unregister Aspects
@Inject
volatile ConfigurationAdmin m_ca;
@@ -60,87 +66,139 @@
final DependencyManager dm = new DependencyManager(context);
try {
- m_execRegister = Executors.newFixedThreadPool(cores);
- m_execInvoke = Executors.newFixedThreadPool(1);
+ m_execServices = Executors.newFixedThreadPool(cores);
+ m_execAspects = Executors.newFixedThreadPool(cores);
+ int aspectPidCounter = 1;
+ int aspectCounter = 1;
- for (int loop = 0; loop < 10000; loop++) {
- Ensure e = new Ensure(false);
- long timeStamp = System.currentTimeMillis();
-
- // Create one client depending on 'SERVICES' S services
- Client client = new Client(e);
- Component c = dm
- .createComponent()
- .setImplementation(client);
- for (int i = 0; i < SERVICES; i ++) {
+ long timeStamp = System.currentTimeMillis();
+ for (int loop = 0; loop < 30000; loop++) {
+ debug("loop#%d -------------------------", (loop + 1));
+
+ final Ensure clientStarted = new Ensure(false);
+ final Ensure clientStopped = new Ensure(false);
+ final Ensure serviceStarted = new Ensure(false);
+ final Ensure serviceStopped = new Ensure(false);
+ final Ensure serviceInvoked = new Ensure(false);
+ final Ensure aspectStarted = new Ensure(false);
+ final Ensure aspectStopped = new Ensure(false);
+ final Ensure aspectUpdated = new Ensure(false);
+ final Ensure aspectInvoked = new Ensure(false);
+
+ // Create one client depending on many S services
+ Client client = new Client(clientStarted, clientStopped);
+ Component c = dm.createComponent().setImplementation(client);
+ for (int i = 0; i < SERVICES; i++) {
c.add(dm.createServiceDependency().setService(S.class, "(name=S" + i + ")").setRequired(true).setCallbacks(
- "add", "remove"));
+ "add", null, "remove", "swap"));
}
dm.add(c);
- // Create all the 'SERVICES' S services concurrently
- info("registering services concurrently");
- final Ensure addE = new Ensure(false);
+ // Create S services concurrently
+ info("registering S services concurrently");
final List<Component> services = new CopyOnWriteArrayList<Component>();
- for (int i = 0; i < SERVICES; i ++) {
+ for (int i = 0; i < SERVICES; i++) {
final String name = "S" + i;
- m_execRegister.execute(new Runnable() {
+ m_execServices.execute(new Runnable() {
public void run() {
Hashtable h = new Hashtable();
h.put("name", name);
- Component sImpl = dm
- .createComponent()
- .setImplementation(new SImpl())
- .setInterface(S.class.getName(), h);
+ Component sImpl = dm.createComponent().setImplementation(
+ new SImpl(serviceStarted, serviceStopped, serviceInvoked, name)).setInterface(
+ S.class.getName(), h);
services.add(sImpl);
dm.add(sImpl);
- addE.step();
}
});
}
- addE.waitForStep(SERVICES, 5000);
-
- // Make sure client is started:
- e.waitForStep(1, 5000);
-
- // Make sure client invoked services SERVICES * INVOKES times
- e.waitForStep(1 + (SERVICES * INVOKES), 10000);
- if ((loop+1) % 100 == 0) {
- long duration = System.currentTimeMillis() - timeStamp;
- warn("Performed %d tests in %d ms.", (loop+1), duration);
- timeStamp = System.currentTimeMillis();
+
+ // Create S aspects concurrently
+ info("registering aspects concurrently");
+ final List<Component> aspects = new CopyOnWriteArrayList<Component>();
+ final List<Configuration> aspectPids = new CopyOnWriteArrayList<Configuration>();
+ for (int i = 0; i < SERVICES; i++) {
+ final String name = "Aspect" + i + "-" + (aspectCounter++);
+ final String filter = "(name=S" + i + ")";
+ final String pid = "Aspect" + i + ".pid" + (aspectPidCounter++);
+ m_execServices.execute(new Runnable() {
+ public void run() {
+ SAspect sa = new SAspect(aspectStarted, aspectStopped, aspectUpdated, aspectInvoked, name);
+ debug("Adding aspect " + sa);
+ Component aspect = dm.createAspectService(S.class, filter, 1).setImplementation(sa).add(
+ dm.createConfigurationDependency().setPid(pid));
+ aspects.add(aspect);
+ dm.add(aspect);
+ try {
+ Configuration aspectConf = m_ca.getConfiguration(pid, null);
+ aspectConf.update(new Hashtable() {
+ {
+ put("name", name);
+ }
+ });
+ aspectPids.add(aspectConf);
+ }
+ catch (IOException e) {
+ error("could not create pid %s for aspect %s", e, pid, name);
+ return;
+ }
+ }
+ });
}
-
+
+ // Make sure all services and aspects are created
+ clientStarted.waitForStep(1, STEP_WAIT);
+ aspectUpdated.waitForStep(SERVICES, STEP_WAIT);
+ aspectStarted.waitForStep(SERVICES, STEP_WAIT);
+ info("all aspects and services registered");
+
+ // Make sure client invoked services and aspects SERVICES * INVOKES times
+ serviceInvoked.waitForStep(SERVICES * INVOKES, STEP_WAIT);
+ aspectInvoked.waitForStep(SERVICES * INVOKES, STEP_WAIT);
+ info("All aspects and services have been properly invoked");
+
// Unregister services concurrently
- final Ensure removeE = new Ensure(false);
info("unregistering services concurrently");
for (final Component sImpl : services) {
- m_execRegister.execute(new Runnable() {
+ m_execServices.execute(new Runnable() {
public void run() {
dm.remove(sImpl);
- removeE.step();
}
});
}
- removeE.waitForStep(SERVICES, 5000);
-
- // Make sure Client has been stopped
- info("waiting for client to be stopped");
- int nextStep = 1 /* start */ + (SERVICES * INVOKES) + 1 /* stop */;
- e.waitForStep(nextStep, 5000);
-
- info("all services stopped");
-
- // Make sure all services are all unbound from our client.
- nextStep += SERVICES; // Client.removed should have been called for each unbound service.
- e.waitForStep(nextStep, 5000);
-
- // Clear everything before interating on next loop
- dm.clear();
-
+
+ // Unregister aspects (and configuration) concurrently
+ info("unregistering aspects concurrently");
+ for (final Component a : aspects) {
+ m_execAspects.execute(new Runnable() {
+ public void run() {
+ debug("removing aspect %s", a);
+ dm.remove(a);
+ }
+ });
+ }
+ info("unregistering aspect configuration concurrently");
+ for (Configuration aspectConf : aspectPids) {
+ aspectConf.delete();
+ }
+
+ info("removing client from dm");
+ dm.remove(c);
+
+ // Wait until all services/aspects have been stopped
+ serviceStopped.waitForStep(SERVICES, STEP_WAIT);
+ aspectStopped.waitForStep(SERVICES, STEP_WAIT);
+ clientStopped.waitForStep(1, STEP_WAIT);
+
if (super.errorsLogged()) {
throw new IllegalStateException("Race test interrupted (some error occured, see previous logs)");
}
+
+ info("finished one test loop");
+ if ((loop + 1) % 100 == 0) {
+ long duration = System.currentTimeMillis() - timeStamp;
+ warn("Performed %d tests in %d ms.", (loop + 1), duration);
+ timeStamp = System.currentTimeMillis();
+ }
}
}
@@ -149,8 +207,8 @@
Assert.fail("Test failed: " + t.getMessage());
}
finally {
- shutdown(m_execRegister);
- shutdown(m_execInvoke);
+ shutdown(m_execServices);
+ shutdown(m_execAspects);
dm.clear();
}
}
@@ -165,53 +223,170 @@
}
public interface S {
- void invoke(Ensure e);
+ void invoke();
}
- public static class SImpl implements S {
- public void invoke(Ensure e) {
- e.step();
+ public class SImpl implements S {
+ final Ensure m_started, m_stopped, m_invoked;
+ final String m_name;
+
+ public SImpl(Ensure started, Ensure stopped, Ensure invoked, String name) {
+ m_name = name;
+ m_started = started;
+ m_stopped = stopped;
+ m_invoked = invoked;
+ }
+
+ public void invoke() {
+ m_invoked.step();
+ }
+
+ public String toString() {
+ return m_name;
+ }
+
+ public void start() {
+ info("started %s", this);
+ m_started.step();
+ }
+
+ public void stop() {
+ info("stopped %s", this);
+ m_stopped.step();
}
}
-
- public class Client {
- final Ensure m_e;
+
+ public class Client implements Runnable {
+ final Ensure m_started, m_stopped;
final Map<String, S> m_services = new ConcurrentHashMap<String, S>();
+ volatile Thread m_thread;
+ volatile Exception m_firstStartStackTrace;
+ volatile boolean m_running;
- Client(Ensure e) {
- m_e = e;
+ Client(Ensure started, Ensure stopped) {
+ m_started = started;
+ m_stopped = stopped;
}
-
- void add(Map<String, String> props, S s) {
- info("client.add: %s (name=%s)", s, props.get("name"));
+
+ synchronized void swap(ServiceReference prevRef, S prev, ServiceReference nextRef, S next) {
+ info("client.swap: prev=%s, next=%s", prev, next);
+ m_services.put((String) nextRef.getProperty("name"), next);
+ }
+
+ synchronized void add(Map<String, String> props, S s) {
+ info("client.add: %s", s);
m_services.put(props.get("name"), s);
}
-
- void remove(Map<String, String> props, S s) {
- info("client.remove: %s (name=%s)", s, props.get("name"));
+
+ synchronized void remove(Map<String, String> props, S s) {
+ info("client.remove: %s", s);
m_services.remove(props.get("name"));
- m_e.step();
}
-
- public void start() {
+
+ public synchronized void start() {
+ if (m_firstStartStackTrace != null) {
+ error("client already started", new Exception());
+ error("first start was done here:", m_firstStartStackTrace);
+ return;
+ }
+ if (m_running) {
+ error("Client already started");
+ return;
+ }
if (m_services.size() != SERVICES) {
error("Client started with unexpected number of injected services: %s", m_services);
return;
}
- m_e.step(1);
- m_execInvoke.execute(new Runnable() {
- public void run() {
- for (int i = 0; i < INVOKES; i ++) {
- for (Map.Entry<String, S> e : m_services.entrySet()) {
- e.getValue().invoke(m_e);
- }
+ m_firstStartStackTrace = new Exception("First start stacktrace");
+ info("client starting");
+
+ m_thread = new Thread(this, "Client");
+ m_thread.setDaemon(true);
+ m_running = true;
+ m_thread.start();
+ }
+
+ public void run() {
+ m_started.step();
+ while (m_running) {
+ for (int i = 0; i < INVOKES; i++) {
+ for (Map.Entry<String, S> e : m_services.entrySet()) {
+ e.getValue().invoke();
}
}
- });
+ }
}
-
+
+ public synchronized void stop() {
+ if (!m_running) {
+ error("client can't be stopped (not running)");
+ return;
+ }
+
+ info("stopping client");
+ m_running = false;
+ try {
+ m_thread.join();
+ }
+ catch (InterruptedException e) {
+ error("interrupted while stopping client", e);
+ }
+ info("client stopped");
+ m_firstStartStackTrace = null;
+ m_stopped.step();
+ }
+ }
+
+ public class SAspect implements S {
+ volatile S m_next;
+ final String m_name;
+ final Ensure m_invoked, m_started, m_stopped, m_updated;
+ volatile Dictionary<String, String> m_conf;
+
+ SAspect(Ensure started, Ensure stopped, Ensure updated, Ensure invoked, String name) {
+ m_started = started;
+ m_stopped = stopped;
+ m_updated = updated;
+ m_invoked = invoked;
+ m_name = name;
+ }
+
+ public void updated(Dictionary<String, String> conf) {
+ if (conf == null) {
+ error("Aspect %s injected with a null configuration", this);
+ return;
+ }
+ debug("Aspect %s injected with configuration: %s", this, conf);
+ m_conf = conf;
+ m_updated.step();
+ }
+
+ public void start() {
+ info("started aspect %s", this);
+ m_started.step();
+ }
+
public void stop() {
- m_e.step(1 /* start */ + (SERVICES * INVOKES) + 1);
+ info("stopped aspect %s", this);
+ m_stopped.step();
+ }
+
+ public void invoke() {
+ if (m_conf == null) {
+ error("Aspect: %s has not been injected with its configuration", this);
+ return;
+ }
+
+ if (!m_name.equals(m_conf.get("name"))) {
+ error("Aspect %s has not been injected with expected configuration: %s", this, m_conf);
+ return;
+ }
+ m_invoked.step();
+ m_next.invoke();
+ }
+
+ public String toString() {
+ return m_name;
}
}
}