[FELIX-4942] Finally introduce some parallelism in the resolver
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1690728 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java b/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java
index bd339a6..98f9799 100644
--- a/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java
+++ b/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java
@@ -29,6 +29,12 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.resolver.util.ArrayMap;
import org.apache.felix.resolver.util.OpenHashMap;
@@ -52,6 +58,8 @@
{
private final Logger m_logger;
+ private final int m_parallelism;
+
// Note this class is not thread safe.
// Only use in the context of a single thread.
class ResolveSession
@@ -70,7 +78,7 @@
// removed the offending capabilities
private Candidates m_multipleCardCandidates = null;
- private final Map<String, List<String>> m_usesCache = new HashMap<String, List<String>>();
+ private final ConcurrentMap<String, List<String>> m_usesCache = new ConcurrentHashMap<String, List<String>>();
ResolveSession(ResolveContext resolveContext)
{
@@ -102,18 +110,44 @@
return m_resolveContext;
}
- public Map<String, List<String>> getUsesCache() {
+ public ConcurrentMap<String, List<String>> getUsesCache() {
return m_usesCache;
}
}
public ResolverImpl(Logger logger)
{
- m_logger = logger;
+ this(logger, Runtime.getRuntime().availableProcessors());
+ }
+
+ public ResolverImpl(Logger logger, int parallelism)
+ {
+ this.m_logger = logger;
+ this.m_parallelism = parallelism;
}
public Map<Resource, List<Wire>> resolve(ResolveContext rc) throws ResolutionException
{
+ if (m_parallelism > 1)
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(m_parallelism);
+ try
+ {
+ return doResolve(rc, executor);
+ }
+ finally
+ {
+ executor.shutdownNow();
+ }
+ }
+ else
+ {
+ return doResolve(rc, new DumbExecutor());
+ }
+ }
+
+ private Map<Resource, List<Wire>> doResolve(ResolveContext rc, Executor executor) throws ResolutionException
+ {
ResolveSession session = new ResolveSession(rc);
Map<Resource, List<Wire>> wireMap =
new HashMap<Resource, List<Wire>>();
@@ -267,7 +301,7 @@
}
Map<Resource, ResolutionError> currentFaultyResources = new HashMap<Resource, ResolutionError>();
- rethrow = checkConsistency(session, allCandidates, currentFaultyResources, hosts, false);
+ rethrow = checkConsistency(executor, session, allCandidates, currentFaultyResources, hosts, false);
if (!currentFaultyResources.isEmpty())
{
@@ -371,6 +405,7 @@
}
private ResolutionError checkConsistency(
+ Executor executor,
ResolveSession session,
Candidates allCandidates,
Map<Resource, ResolutionError> currentFaultyResources,
@@ -379,7 +414,7 @@
{
// Calculate package spaces
Map<Resource, Packages> resourcePkgMap =
- calculatePackageSpaces(session, allCandidates, hosts.values());
+ calculatePackageSpaces(executor, session, allCandidates, hosts.values());
ResolutionError error = null;
// Check package consistency
Map<Resource, Object> resultCache =
@@ -510,6 +545,7 @@
// execute code, so we don't need to check for
// this case like we do for a normal resolve.
rethrow = checkConsistency(
+ new DumbExecutor(),
session, allCandidates,
new OpenHashMap<Resource, ResolutionError>(resourcePkgMap.size()),
Collections.singletonMap(host, allCandidates.getWrappedHost(host)),
@@ -1028,35 +1064,48 @@
}
}
- private static void getWireCandidatesAndRecurse(
- ResolveSession session,
- Candidates allCandidates,
- Map<Resource, List<WireCandidate>> allWireCandidates,
- Resource resource
- )
- {
- List<WireCandidate> wireCandidates = getWireCandidates(session, allCandidates, resource);
- allWireCandidates.put(resource, wireCandidates);
- for (WireCandidate w : wireCandidates)
- {
- Resource r = w.capability.getResource();
- if (!allWireCandidates.containsKey(r))
- {
- getWireCandidatesAndRecurse(session, allCandidates, allWireCandidates, r);
- }
- }
- }
-
private static Map<Resource, Packages> calculatePackageSpaces(
+ final Executor innerExecutor,
final ResolveSession session,
final Candidates allCandidates,
Collection<Resource> hosts)
{
+ final EnhancedExecutor executor = new EnhancedExecutor(innerExecutor);
+
// Parallel compute wire candidates
- final Map<Resource, List<WireCandidate>> allWireCandidates = new OpenHashMap<Resource, List<WireCandidate>>();
- for (Resource resource : hosts)
+ final Map<Resource, List<WireCandidate>> allWireCandidates = new ConcurrentHashMap<Resource, List<WireCandidate>>();
{
- getWireCandidatesAndRecurse(session, allCandidates, allWireCandidates, resource);
+ final ConcurrentMap<Resource, Runnable> tasks = new ConcurrentHashMap<Resource, Runnable>(allCandidates.getNbResources());
+ class Computer implements Runnable
+ {
+ final Resource resource;
+ public Computer(Resource resource)
+ {
+ this.resource = resource;
+ }
+ public void run()
+ {
+ List<WireCandidate> wireCandidates = getWireCandidates(session, allCandidates, resource);
+ allWireCandidates.put(resource, wireCandidates);
+ for (WireCandidate w : wireCandidates)
+ {
+ Resource u = w.capability.getResource();
+ if (!tasks.containsKey(u))
+ {
+ Computer c = new Computer(u);
+ if (tasks.putIfAbsent(u, c) == null)
+ {
+ executor.execute(c);
+ }
+ }
+ }
+ }
+ }
+ for (Resource resource : hosts)
+ {
+ executor.execute(new Computer(resource));
+ }
+ executor.await();
}
// Parallel get all exported packages
@@ -1065,14 +1114,28 @@
{
final Packages packages = new Packages(resource);
allPackages.put(resource, packages);
- calculateExportedPackages(session, allCandidates, resource, packages.m_exportedPkgs);
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ calculateExportedPackages(session, allCandidates, resource, packages.m_exportedPkgs);
+ }
+ });
}
+ executor.await();
// Parallel compute package lists
for (final Resource resource : allWireCandidates.keySet())
{
- getPackages(session, allCandidates, allWireCandidates, allPackages, resource, allPackages.get(resource));
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ getPackages(session, allCandidates, allWireCandidates, allPackages, resource, allPackages.get(resource));
+ }
+ });
}
+ executor.await();
// Sequential compute package sources
// TODO: make that parallel
@@ -1084,8 +1147,15 @@
// Parallel compute uses
for (final Resource resource : allWireCandidates.keySet())
{
- computeUses(session, allWireCandidates, allPackages, resource);
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ computeUses(session, allWireCandidates, allPackages, resource);
+ }
+ });
}
+ executor.await();
return allPackages;
}
@@ -2247,4 +2317,66 @@
}
}
+ private static class EnhancedExecutor
+ {
+ private final Executor executor;
+ private final AtomicInteger count = new AtomicInteger();
+
+ public EnhancedExecutor(Executor executor)
+ {
+ this.executor = executor;
+ }
+
+ public void execute(final Runnable runnable)
+ {
+ count.incrementAndGet();
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ runnable.run();
+ }
+ finally
+ {
+ if (count.decrementAndGet() == 0)
+ {
+ synchronized (count)
+ {
+ count.notifyAll();
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public void await()
+ {
+ synchronized (count)
+ {
+ if (count.get() > 0)
+ {
+ try
+ {
+ count.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+ }
+ }
+
+ static class DumbExecutor implements Executor
+ {
+ public void execute(Runnable command)
+ {
+ command.run();
+ }
+ }
+
}