FELIX-4866 Fix concurrency issue with Service Registry
Includes a concurrency test from pderop that exposes the issue. Many thanks Pierre for identifying this problem!
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1680258 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/framework/src/main/java/org/apache/felix/framework/capabilityset/CapabilitySet.java b/framework/src/main/java/org/apache/felix/framework/capabilityset/CapabilitySet.java
index 60356c7..d929afd 100644
--- a/framework/src/main/java/org/apache/felix/framework/capabilityset/CapabilitySet.java
+++ b/framework/src/main/java/org/apache/felix/framework/capabilityset/CapabilitySet.java
@@ -32,6 +32,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.felix.framework.util.SecureAction;
@@ -105,7 +106,8 @@
value = convertArrayToList(value);
}
- Map<Object, Set<BundleCapability>> index = entry.getValue();
+ ConcurrentMap<Object, Set<BundleCapability>> index =
+ (ConcurrentMap<Object, Set<BundleCapability>>) entry.getValue();
if (value instanceof Collection)
{
@@ -124,14 +126,12 @@
}
private void indexCapability(
- Map<Object, Set<BundleCapability>> index, BundleCapability cap, Object capValue)
+ ConcurrentMap<Object, Set<BundleCapability>> index, BundleCapability cap, Object capValue)
{
- Set<BundleCapability> caps = index.get(capValue);
- if (caps == null)
- {
- caps = Collections.newSetFromMap(new ConcurrentHashMap<BundleCapability, Boolean>());
- index.put(capValue, caps);
- }
+ Set<BundleCapability> caps = Collections.newSetFromMap(new ConcurrentHashMap<BundleCapability, Boolean>());
+ Set<BundleCapability> prevval = index.putIfAbsent(capValue, caps);
+ if (prevval != null)
+ caps = prevval;
caps.add(cap);
}
diff --git a/framework/src/test/java/org/apache/felix/framework/ConcurrencyTest.java b/framework/src/test/java/org/apache/felix/framework/ConcurrencyTest.java
new file mode 100644
index 0000000..fef1e6e
--- /dev/null
+++ b/framework/src/test/java/org/apache/felix/framework/ConcurrencyTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.framework;
+
+import static java.lang.System.err;
+import static java.lang.System.out;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.launch.Framework;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+/**
+ * This test performs concurrent registration of service components that have dependencies between each other.
+ * There are 10 components. The first one has an optional dependency on the second one, the second on the third , etc ... The last component
+ * does not have any dependencies.
+ * At the beginning of a concurrent test, the nth component creates a service tracker on the nth+1 component and registers in the registry
+ * (and components and their associated Service Trackers are registered/opened concurrently).
+ * At the end of an iteration test, we check that all nth component are properly injected (satisfied) with the nth+1 component (except the
+ * last one which has no dependency).
+ */
+public class ConcurrencyTest extends TestCase
+{
+ public static final int DELAY = 1000;
+ final static int NPROCS = Runtime.getRuntime().availableProcessors();
+ final static int COMPONENTS = (NPROCS == 1) ? 4 : NPROCS;
+ final static int ITERATIONS = 50000;
+
+ /**
+ * Threadpool which can be optionally used by parallel scenarios.
+ */
+ private final static Executor TPOOL = Executors.newFixedThreadPool(COMPONENTS);
+
+ /**
+ * Latch used to ensure that the test has completed propertly.
+ */
+ private final CountDownLatch m_testDone = new CountDownLatch(1);
+
+ /**
+ * Starts a concurrent test.
+ */
+ public void testConcurrentComponents() throws Exception
+ {
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(Constants.FRAMEWORK_SYSTEMPACKAGES,
+ "org.osgi.framework; version=1.4.0," + "org.osgi.service.packageadmin; version=1.2.0,"
+ + "org.osgi.service.startlevel; version=1.1.0," + "org.osgi.util.tracker; version=1.3.3,"
+ + "org.osgi.service.url; version=1.0.0");
+ File cacheDir = File.createTempFile("felix-cache", ".dir");
+ cacheDir.delete();
+ cacheDir.mkdirs();
+ String cache = cacheDir.getPath();
+ params.put("felix.cache.profiledir", cache);
+ params.put("felix.cache.dir", cache);
+ params.put(Constants.FRAMEWORK_STORAGE, cache);
+
+ Framework f = new Felix(params);
+ f.init();
+ f.start();
+
+ try
+ {
+ out.println("Starting load test.");
+ Loader loader = new Loader(f.getBundleContext());
+ loader.start();
+
+ Assert.assertTrue(m_testDone.await(60, TimeUnit.SECONDS));
+ loader.stop();
+ }
+ finally
+ {
+ f.stop();
+ Thread.sleep(DELAY);
+ deleteDir(cacheDir);
+ }
+ }
+
+ private static void deleteDir(File root) throws IOException
+ {
+ if (root.isDirectory())
+ {
+ for (File file : root.listFiles())
+ {
+ deleteDir(file);
+ }
+ }
+ assertTrue(root.delete());
+ }
+
+ /**
+ * A simple component, that creates a service tracker on another component and registers in the osgi service registry.
+ */
+ public class Component implements ServiceTrackerCustomizer<Component, Component>
+ {
+ private final BundleContext m_ctx;
+ private final int m_id;
+ private volatile int m_dependsOnId = -1;
+ private volatile ServiceTracker<Component, Component> m_tracker;
+ private volatile boolean m_satisfied;
+ private volatile ServiceRegistration<?> m_registration;
+
+ public Component(BundleContext ctx, int id)
+ {
+ m_ctx = ctx;
+ m_id = id;
+ }
+
+ public void dependsOn(int componentId)
+ {
+ m_dependsOnId = componentId;
+ }
+
+ public void start()
+ {
+ // if we depends on another component, add the dependency.
+ if (m_dependsOnId != -1)
+ {
+ Filter filter;
+ try
+ {
+ filter = m_ctx.createFilter(
+ "(&(objectClass=" + Component.class.getName() + ")(id=" + m_dependsOnId + "))");
+ }
+ catch (InvalidSyntaxException e)
+ {
+ e.printStackTrace();
+ return;
+ }
+ m_tracker = new ServiceTracker<Component, Component>(m_ctx, filter, this);
+ m_tracker.open();
+ }
+ else
+ {
+ // We don't depend on anything, mark our satisfied flag to true
+ m_satisfied = true;
+ }
+ register();
+ }
+
+ public boolean isSatisfied()
+ {
+ return m_satisfied;
+ }
+
+ public void stop()
+ {
+ if (m_tracker != null)
+ {
+ m_tracker.close();
+ }
+ m_registration.unregister();
+ }
+
+ public Component addingService(ServiceReference<Component> reference)
+ {
+ Component service = m_ctx.getService(reference);
+ String id = (String) reference.getProperty("id");
+ if (String.valueOf(m_dependsOnId).equals(id))
+ {
+ m_satisfied = true;
+ }
+ else
+ {
+ System.err.println("Component#" + m_id + " received wrong dependency #" + id);
+ }
+
+ return service;
+ }
+
+ public void modifiedService(ServiceReference<Component> reference, Component service)
+ {
+ }
+
+ public void removedService(ServiceReference<Component> reference, Component service)
+ {
+ try
+ {
+ m_ctx.ungetService(reference);
+ }
+ catch (IllegalStateException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private void register() {
+ Hashtable<String, Object> properties = new Hashtable<String, Object>();
+ properties.put("id", String.valueOf(m_id));
+ m_registration = m_ctx.registerService(Component.class.getName(), this, properties);
+ }
+ }
+
+ public class Loader implements Runnable
+ {
+ final BundleContext m_ctx;
+ private Thread m_thread;
+
+ Loader(BundleContext ctx)
+ {
+ m_ctx = ctx;
+ }
+
+ public void start()
+ {
+ m_thread = new Thread(this);
+ m_thread.start();
+ }
+
+ public void stop()
+ {
+ m_thread.interrupt();
+ }
+
+ public void run()
+ {
+ // Creates all components. Each nth components will depends on the
+ // nth+1 component. The last component does not depend on another one.
+
+ out.println("Starting Concurrency test ...");
+ for (int i = 0; i < ITERATIONS; i++)
+ {
+ try
+ {
+ if (i % 1000 == 0)
+ {
+ out.println(".");
+ }
+ createComponentsConcurrently(i);
+ }
+ catch (Throwable error)
+ {
+ err.println("Test failed");
+ error.printStackTrace(err);
+ return;
+ }
+ }
+
+ out.println("\nTest successful.");
+ m_testDone.countDown();
+ }
+
+ private void createComponentsConcurrently(int iteration) throws Exception
+ {
+ // Create components.
+ final List<Component> components = new ArrayList<Component>();
+ for (int i = 0; i < COMPONENTS; i++)
+ {
+ components.add(createComponents(iteration, i));
+ }
+
+ // Start all components asynchronously.
+ final CountDownLatch latch = new CountDownLatch(components.size());
+ for (int i = 0; i < COMPONENTS; i++)
+ {
+ final Component component = components.get(i);
+ TPOOL.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ component.start();
+ }
+ finally
+ {
+ latch.countDown();
+ }
+
+ }
+ });
+ }
+
+ if (!latch.await(5, TimeUnit.SECONDS))
+ {
+ System.err.println("ThreadPool did not complete timely.");
+ return;
+ }
+
+ // Count the number of satisfied components.
+ long satisfied = 0;
+ for (int i = 0; i < COMPONENTS; i++)
+ {
+ satisfied += (components.get(i).isSatisfied()) ? 1 : 0;
+ }
+
+ // Report an error if we don't have expected satisfied components.
+ if (satisfied != COMPONENTS) {
+ for (int i = 0; i < COMPONENTS; i ++) {
+ if (! components.get(i).isSatisfied()) {
+ out.println("Component #" + i + " unsatisfied.");
+ }
+ }
+ Assert.fail("Found unsatisfied components: " + String.valueOf(COMPONENTS - satisfied));
+ return;
+ }
+
+ // Stop all components
+ for (int i = 0; i < COMPONENTS; i++)
+ {
+ components.get(i).stop();
+ }
+ }
+
+ private Component createComponents(int iteration, int i)
+ {
+ Component component = new Component(m_ctx, iteration + i);
+ if (i < (COMPONENTS - 1))
+ {
+ component.dependsOn(iteration + i + 1);
+ }
+ return component;
+ }
+ }
+}