Finalized this test, which is reproducing some problems inside ConfigurationDependencyImpl.
Also, this test seems to reproduce a NPE inside config admin (see FELIX-4385).


git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1558377 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java b/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java
index 96e6dd6..1096b00 100644
--- a/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java
+++ b/dependencymanager/test/src/test/java/org/apache/felix/dm/test/integration/api/ServiceRaceTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.felix.dm.test.integration.api;
 
+import java.io.IOException;
+import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -38,17 +40,21 @@
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
 
 /**
- * FELIX-3910: Another race test for concurrent service registration/unregistration.
+ * Another race test for concurrent service registrations/unregistrations.
+ * Aspects are also depending on some configuration pids, which are also registered/unregistered concurrently.
  */
 @RunWith(PaxExam.class)
 public class ServiceRaceTest extends TestBase {
-    final static int SERVICES = 10;
+    final static int STEP_WAIT = 10000;
+    final static int SERVICES = 3;
     final static int INVOKES = 10;
-    volatile ExecutorService m_execRegister; // used to register/unregister S services
-    volatile ExecutorService m_execInvoke; // Used by Client to invoke S services
+    volatile ExecutorService m_execServices; // used to register/unregister S services
+    volatile ExecutorService m_execAspects; // used to register/unregister Aspects
 
     @Inject
     volatile ConfigurationAdmin m_ca;
@@ -60,87 +66,139 @@
         final DependencyManager dm = new DependencyManager(context);
 
         try {
-            m_execRegister = Executors.newFixedThreadPool(cores);
-            m_execInvoke = Executors.newFixedThreadPool(1);
+            m_execServices = Executors.newFixedThreadPool(cores);
+            m_execAspects = Executors.newFixedThreadPool(cores);
+            int aspectPidCounter = 1;
+            int aspectCounter = 1;
 
-            for (int loop = 0; loop < 10000; loop++) {
-                Ensure e = new Ensure(false);
-                long timeStamp = System.currentTimeMillis();
-                
-                // Create one client depending on 'SERVICES' S services
-                Client client = new Client(e);
-                Component c = dm
-                        .createComponent()
-                        .setImplementation(client);
-                for (int i = 0; i < SERVICES; i ++) {
+            long timeStamp = System.currentTimeMillis();
+            for (int loop = 0; loop < 30000; loop++) {
+                debug("loop#%d -------------------------", (loop + 1));
+
+                final Ensure clientStarted = new Ensure(false);
+                final Ensure clientStopped = new Ensure(false);
+                final Ensure serviceStarted = new Ensure(false);
+                final Ensure serviceStopped = new Ensure(false);
+                final Ensure serviceInvoked = new Ensure(false);
+                final Ensure aspectStarted = new Ensure(false);
+                final Ensure aspectStopped = new Ensure(false);
+                final Ensure aspectUpdated = new Ensure(false);
+                final Ensure aspectInvoked = new Ensure(false);
+
+                // Create one client depending on many S services
+                Client client = new Client(clientStarted, clientStopped);
+                Component c = dm.createComponent().setImplementation(client);
+                for (int i = 0; i < SERVICES; i++) {
                     c.add(dm.createServiceDependency().setService(S.class, "(name=S" + i + ")").setRequired(true).setCallbacks(
-                        "add", "remove"));
+                        "add", null, "remove", "swap"));
                 }
                 dm.add(c);
 
-                // Create all the 'SERVICES' S services concurrently
-                info("registering services concurrently");
-                final Ensure addE = new Ensure(false);
+                // Create S services concurrently
+                info("registering S services concurrently");
                 final List<Component> services = new CopyOnWriteArrayList<Component>();
-                for (int i = 0; i < SERVICES; i ++) {
+                for (int i = 0; i < SERVICES; i++) {
                     final String name = "S" + i;
-                    m_execRegister.execute(new Runnable() {
+                    m_execServices.execute(new Runnable() {
                         public void run() {
                             Hashtable h = new Hashtable();
                             h.put("name", name);
-                            Component sImpl = dm
-                                .createComponent()
-                                .setImplementation(new SImpl())
-                                .setInterface(S.class.getName(), h);
+                            Component sImpl = dm.createComponent().setImplementation(
+                                new SImpl(serviceStarted, serviceStopped, serviceInvoked, name)).setInterface(
+                                S.class.getName(), h);
                             services.add(sImpl);
                             dm.add(sImpl);
-                            addE.step();
                         }
                     });
                 }
-                addE.waitForStep(SERVICES, 5000);
-                
-                // Make sure client is started:
-                e.waitForStep(1, 5000);
-                
-                // Make sure client invoked services SERVICES * INVOKES times
-                e.waitForStep(1 + (SERVICES * INVOKES), 10000);
-                if ((loop+1) % 100 == 0) {
-                    long duration = System.currentTimeMillis() - timeStamp;
-                    warn("Performed %d tests in %d ms.", (loop+1), duration);
-                    timeStamp = System.currentTimeMillis();
+
+                // Create S aspects concurrently
+                info("registering aspects concurrently");
+                final List<Component> aspects = new CopyOnWriteArrayList<Component>();
+                final List<Configuration> aspectPids = new CopyOnWriteArrayList<Configuration>();
+                for (int i = 0; i < SERVICES; i++) {
+                    final String name = "Aspect" + i + "-" + (aspectCounter++);
+                    final String filter = "(name=S" + i + ")";
+                    final String pid = "Aspect" + i + ".pid" + (aspectPidCounter++);
+                    m_execServices.execute(new Runnable() {
+                        public void run() {
+                            SAspect sa = new SAspect(aspectStarted, aspectStopped, aspectUpdated, aspectInvoked, name);
+                            debug("Adding aspect " + sa);
+                            Component aspect = dm.createAspectService(S.class, filter, 1).setImplementation(sa).add(
+                                dm.createConfigurationDependency().setPid(pid));
+                            aspects.add(aspect);
+                            dm.add(aspect);
+                            try {
+                                Configuration aspectConf = m_ca.getConfiguration(pid, null);
+                                aspectConf.update(new Hashtable() {
+                                    {
+                                        put("name", name);
+                                    }
+                                });
+                                aspectPids.add(aspectConf);
+                            }
+                            catch (IOException e) {
+                                error("could not create pid %s for aspect %s", e, pid, name);
+                                return;
+                            }
+                        }
+                    });
                 }
-                
+
+                // Make sure all services and aspects are created
+                clientStarted.waitForStep(1, STEP_WAIT);
+                aspectUpdated.waitForStep(SERVICES, STEP_WAIT);
+                aspectStarted.waitForStep(SERVICES, STEP_WAIT);
+                info("all aspects and services registered");
+
+                // Make sure client invoked services and aspects SERVICES * INVOKES times
+                serviceInvoked.waitForStep(SERVICES * INVOKES, STEP_WAIT);
+                aspectInvoked.waitForStep(SERVICES * INVOKES, STEP_WAIT);
+                info("All aspects and services have been properly invoked");
+
                 // Unregister services concurrently
-                final Ensure removeE = new Ensure(false);
                 info("unregistering services concurrently");
                 for (final Component sImpl : services) {
-                    m_execRegister.execute(new Runnable() {
+                    m_execServices.execute(new Runnable() {
                         public void run() {
                             dm.remove(sImpl);
-                            removeE.step();
                         }
                     });
                 }
-                removeE.waitForStep(SERVICES, 5000);
-                
-                // Make sure Client has been stopped
-                info("waiting for client to be stopped");
-                int nextStep = 1 /* start */ + (SERVICES * INVOKES) + 1 /* stop */;
-                e.waitForStep(nextStep, 5000);
-                
-                info("all services stopped");
-                
-                // Make sure all services are all unbound from our client.
-                nextStep += SERVICES; // Client.removed should have been called for each unbound service.
-                e.waitForStep(nextStep, 5000);
-                
-                // Clear everything before interating on next loop
-                dm.clear();
-                
+
+                // Unregister aspects (and configuration) concurrently
+                info("unregistering aspects concurrently");
+                for (final Component a : aspects) {
+                    m_execAspects.execute(new Runnable() {
+                        public void run() {
+                            debug("removing aspect %s", a);
+                            dm.remove(a);
+                        }
+                    });
+                }
+                info("unregistering aspect configuration concurrently");
+                for (Configuration aspectConf : aspectPids) {
+                    aspectConf.delete();
+                }
+
+                info("removing client from dm");
+                dm.remove(c);
+
+                // Wait until all services/aspects have been stopped
+                serviceStopped.waitForStep(SERVICES, STEP_WAIT);
+                aspectStopped.waitForStep(SERVICES, STEP_WAIT);
+                clientStopped.waitForStep(1, STEP_WAIT);
+
                 if (super.errorsLogged()) {
                     throw new IllegalStateException("Race test interrupted (some error occured, see previous logs)");
                 }
+
+                info("finished one test loop");
+                if ((loop + 1) % 100 == 0) {
+                    long duration = System.currentTimeMillis() - timeStamp;
+                    warn("Performed %d tests in %d ms.", (loop + 1), duration);
+                    timeStamp = System.currentTimeMillis();
+                }
             }
         }
 
@@ -149,8 +207,8 @@
             Assert.fail("Test failed: " + t.getMessage());
         }
         finally {
-            shutdown(m_execRegister);
-            shutdown(m_execInvoke);
+            shutdown(m_execServices);
+            shutdown(m_execAspects);
             dm.clear();
         }
     }
@@ -165,53 +223,170 @@
     }
 
     public interface S {
-        void invoke(Ensure e);
+        void invoke();
     }
 
-    public static class SImpl implements S {
-        public void invoke(Ensure e) {
-            e.step();
+    public class SImpl implements S {
+        final Ensure m_started, m_stopped, m_invoked;
+        final String m_name;
+
+        public SImpl(Ensure started, Ensure stopped, Ensure invoked, String name) {
+            m_name = name;
+            m_started = started;
+            m_stopped = stopped;
+            m_invoked = invoked;
+        }
+
+        public void invoke() {
+            m_invoked.step();
+        }
+
+        public String toString() {
+            return m_name;
+        }
+
+        public void start() {
+            info("started %s", this);
+            m_started.step();
+        }
+
+        public void stop() {
+            info("stopped %s", this);
+            m_stopped.step();
         }
     }
-    
-    public class Client {
-        final Ensure m_e;
+
+    public class Client implements Runnable {
+        final Ensure m_started, m_stopped;
         final Map<String, S> m_services = new ConcurrentHashMap<String, S>();
+        volatile Thread m_thread;
+        volatile Exception m_firstStartStackTrace;
+        volatile boolean m_running;
 
-        Client(Ensure e) {
-            m_e = e;
+        Client(Ensure started, Ensure stopped) {
+            m_started = started;
+            m_stopped = stopped;
         }
-        
-        void add(Map<String, String> props, S s) {
-            info("client.add: %s (name=%s)", s, props.get("name"));
+
+        synchronized void swap(ServiceReference prevRef, S prev, ServiceReference nextRef, S next) {
+            info("client.swap: prev=%s, next=%s", prev, next);
+            m_services.put((String) nextRef.getProperty("name"), next);
+        }
+
+        synchronized void add(Map<String, String> props, S s) {
+            info("client.add: %s", s);
             m_services.put(props.get("name"), s);
         }
-        
-        void remove(Map<String, String> props, S s) {
-            info("client.remove: %s (name=%s)", s, props.get("name"));
+
+        synchronized void remove(Map<String, String> props, S s) {
+            info("client.remove: %s", s);
             m_services.remove(props.get("name"));
-            m_e.step();
         }
-        
-        public void start() {   
+
+        public synchronized void start() {
+            if (m_firstStartStackTrace != null) {
+                error("client already started", new Exception());
+                error("first start was done here:", m_firstStartStackTrace);
+                return;
+            }
+            if (m_running) {
+                error("Client already started");
+                return;
+            }
             if (m_services.size() != SERVICES) {
                 error("Client started with unexpected number of injected services: %s", m_services);
                 return;
             }
-            m_e.step(1);
-            m_execInvoke.execute(new Runnable() {
-                public void run() {
-                    for (int i = 0; i < INVOKES; i ++) {
-                        for (Map.Entry<String, S> e : m_services.entrySet()) {
-                            e.getValue().invoke(m_e);
-                        }
+            m_firstStartStackTrace = new Exception("First start stacktrace");
+            info("client starting");
+
+            m_thread = new Thread(this, "Client");
+            m_thread.setDaemon(true);
+            m_running = true;
+            m_thread.start();
+        }
+
+        public void run() {
+            m_started.step();
+            while (m_running) {
+                for (int i = 0; i < INVOKES; i++) {
+                    for (Map.Entry<String, S> e : m_services.entrySet()) {
+                        e.getValue().invoke();
                     }
                 }
-            });
+            }
         }
-        
+
+        public synchronized void stop() {
+            if (!m_running) {
+                error("client can't be stopped (not running)");
+                return;
+            }
+
+            info("stopping client");
+            m_running = false;
+            try {
+                m_thread.join();
+            }
+            catch (InterruptedException e) {
+                error("interrupted while stopping client", e);
+            }
+            info("client stopped");
+            m_firstStartStackTrace = null;
+            m_stopped.step();
+        }
+    }
+
+    public class SAspect implements S {
+        volatile S m_next;
+        final String m_name;
+        final Ensure m_invoked, m_started, m_stopped, m_updated;
+        volatile Dictionary<String, String> m_conf;
+
+        SAspect(Ensure started, Ensure stopped, Ensure updated, Ensure invoked, String name) {
+            m_started = started;
+            m_stopped = stopped;
+            m_updated = updated;
+            m_invoked = invoked;
+            m_name = name;
+        }
+
+        public void updated(Dictionary<String, String> conf) {
+            if (conf == null) {
+                error("Aspect %s injected with a null configuration", this);
+                return;
+            }
+            debug("Aspect %s injected with configuration: %s", this, conf);
+            m_conf = conf;
+            m_updated.step();
+        }
+
+        public void start() {
+            info("started aspect %s", this);
+            m_started.step();
+        }
+
         public void stop() {
-            m_e.step(1 /* start */ + (SERVICES * INVOKES) + 1);
+            info("stopped aspect %s", this);
+            m_stopped.step();
+        }
+
+        public void invoke() {
+            if (m_conf == null) {
+                error("Aspect: %s has not been injected with its configuration", this);
+                return;
+            }
+
+            if (!m_name.equals(m_conf.get("name"))) {
+                error("Aspect %s has not been injected with expected configuration: %s", this, m_conf);
+                return;
+            }
+            m_invoked.step();
+            m_next.invoke();
+        }
+
+        public String toString() {
+            return m_name;
         }
     }
 }