FELIX-3910: Back ported the new SerialExecutor from the upcoming dependency 4.0 version, and using it in the
ServiceDependencyImpl, as well as in the ComponentImpl class, especially for fixing the FELIX-4002 issue.
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1607340 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/BlockingSerialExecutor.java b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/BlockingSerialExecutor.java
deleted file mode 100644
index efcea7a..0000000
--- a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/BlockingSerialExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.dm.impl;
-
-import java.util.LinkedList;
-
-/**
- * This class allows to serialize the execution of tasks on one single/unique thread.
- * Other threads are blocked until they are elected for execution.
- *
- * <p>note I: when one leader thread executes a task, it does not hold any locks
- * while executing the task, and does not execute tasks scheduled by other threads.
- *
- * <p>note II: this executor is reentrant: when one task executed by a leader thread
- * reschedule another task, then the task is run immediately.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class BlockingSerialExecutor {
- private final LinkedList m_tasksQueue = new LinkedList();
- private Thread m_executingThread = null;
-
- /**
- * Executes a task exclusively without holding any locks (other concurrent tasks are blocked until the current task is executed).
- * @param task a task to be executed serially, without holding any locks.
- */
- public void execute(Runnable task) {
- boolean releaseLock = false;
- synchronized (this) {
- if (m_executingThread != Thread.currentThread()) {
- m_tasksQueue.addLast(task);
- while (m_tasksQueue.size() > 0 && m_tasksQueue.get(0) != task) {
- try {
- // TODO it might make sense to use a maxwait time and throw an exception on timeouts.
- wait();
- } catch (InterruptedException e) {
- }
- }
- m_executingThread = Thread.currentThread();
- releaseLock = true;
- }
- }
- try {
- task.run();
- } finally {
- if (releaseLock) {
- synchronized (this) {
- m_tasksQueue.remove(task);
- notifyAll();
- m_executingThread = null;
- }
- }
- }
- }
-}
diff --git a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java
index 70cf814..ea6fe3a 100644
--- a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java
+++ b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java
@@ -18,73 +18,134 @@
*/
package org.apache.felix.dm.impl;
-import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.osgi.service.log.LogService;
/**
* Allows you to enqueue tasks from multiple threads and then execute
* them on one thread sequentially. It assumes more than one thread will
* try to execute the tasks and it will make an effort to pick the first
* task that comes along whilst making sure subsequent tasks return
- * without waiting.
+ * without waiting. <p>
+ *
+ * This class is lock free and ensures "safe object publication" between scheduling threads and
+ * actual executing thread: if one thread T1 schedules a task, but another thread T2 actually
+ * executes it, then all the objects from the T1 thread will be "safely published" to the executing T2 thread.
+ * Safe publication is ensured because we are using a ConcurrentLinkedQueue.
+ * (see [1], chapter 3.5.3 (Safe publication idioms).
+ *
+ * [1] Java Concurrency In Practice, Addison Wesley
*
* @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
*/
-public final class SerialExecutor {
- private static final Runnable DUMMY_RUNNABLE = new Runnable() { public void run() {}; };
- private final LinkedList m_workQueue = new LinkedList();
- private Runnable m_active;
-
+public class SerialExecutor implements Executor {
+ /**
+ * All tasks scheduled are stored there and only one thread may run them.
+ **/
+ protected final ConcurrentLinkedQueue m_tasks = new ConcurrentLinkedQueue();
+
+ /**
+ * Thread currently executing the task queue.
+ **/
+ protected final AtomicReference m_runningThread = new AtomicReference();
+
+ /**
+ * Logger used when a task execution throws an exception
+ **/
+ private final Logger m_logger;
+
/**
- * Enqueue a new task for later execution. This method is
- * thread-safe, so multiple threads can contribute tasks.
- *
- * @param runnable the runnable containing the actual task
+ * Makes a new SerialExecutor
+ * @param logger used when a task execution throws an exception. Can be null if no exception should be logger.
*/
- public synchronized void enqueue(final Runnable runnable) {
- m_workQueue.addLast(new Runnable() {
- public void run() {
- try {
- runnable.run();
- }
- finally {
- scheduleNext();
- }
- }
- });
- }
-
- /**
- * Execute any pending tasks. This method is thread safe,
- * so multiple threads can try to execute the pending
- * tasks, but only the first will be used to actually do
- * so. Other threads will return immediately.
- */
- public void execute() {
- Runnable active;
- synchronized (this) {
- active = m_active;
- // for now just put some non-null value in there so we can never
- // get a race condition when two threads enter this section after
- // one another (causing sheduleNext() to be invoked twice below)
- m_active = DUMMY_RUNNABLE;
- }
- if (active == null) {
- scheduleNext();
- }
+ public SerialExecutor(Logger logger) {
+ m_logger = logger;
}
- private void scheduleNext() {
- Runnable active;
- synchronized (this) {
- if (!m_workQueue.isEmpty()) {
- m_active = (Runnable) m_workQueue.removeFirst();
- } else {
- m_active = null;
- }
- active = m_active;
- }
- if (active != null) {
- active.run();
+ /**
+ * Enqueues a task for later execution. You must call {@link #execute()} in order
+ * to trigger the task execution, which may or may not be executed by
+ * your current thread.
+ */
+ public void enqueue(Runnable task) {
+ m_tasks.add(task); // No need to synchronize, m_tasks is a concurrent linked queue.
+ }
+
+ /**
+ * Executes any pending tasks, enqueued using the {@link SerialExecutor#schedule(Runnable)} method.
+ * This method is thread safe, so multiple threads can try to execute the pending
+ * tasks, but only the first will be used to actually do so. Other threads will return immediately.
+ */
+ public void execute() {
+ Thread currentThread = Thread.currentThread();
+ if (m_runningThread.compareAndSet(null, currentThread)) {
+ runTasks(currentThread);
}
}
+
+ /**
+ * Schedules a task for execution, and then attempts to execute it. This method is thread safe, so
+ * multiple threads can try to execute a task but only the first will be executed, other threads will
+ * return immediately, and the first thread will execute the tasks scheduled by the other threads.<p>
+ * <p>
+ * This method is reentrant: if the current thread is currently being executed by this executor, then
+ * the task passed to this method will be executed immediately, from the current invoking thread
+ * (inline execution).
+ */
+ public void execute(Runnable task) {
+ Thread currentThread = Thread.currentThread();
+ if (m_runningThread.get() == currentThread) {
+ runTask(task);
+ } else {
+ enqueue(task);
+ execute();
+ }
+ }
+
+ /**
+ * Run all pending tasks
+ * @param currentRunninghread the current executing thread
+ */
+ private void runTasks(Thread currentRunninghread) {
+ do {
+ try {
+ Runnable task;
+ ConcurrentLinkedQueue tasks = m_tasks;
+
+ while ((task = (Runnable) tasks.poll()) != null) {
+ runTask(task);
+ }
+ }
+ finally {
+ m_runningThread.set(null);
+ }
+ }
+ // We must test again if some tasks have been scheduled after our "while" loop above, but before the
+ // m_runningThread reference has been reset to null.
+ while (!m_tasks.isEmpty() && m_runningThread.compareAndSet(null, currentRunninghread));
+ }
+
+ /**
+ * Run a given task.
+ * @param task the task to execute.
+ */
+ void runTask(Runnable command) {
+ try {
+ command.run();
+ }
+ catch (Throwable t) {
+ if (m_logger != null) {
+ m_logger.log(LogService.LOG_ERROR, "Error processing tasks", t);
+ } else {
+ t.printStackTrace();
+ }
+ }
+ }
+
+ public String toString() {
+ return "[Executor: queue size: " + m_tasks.size() + "]";
+ }
}
diff --git a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java
index 5ec6b37..d74d938 100644
--- a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java
+++ b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java
@@ -41,7 +41,7 @@
import org.apache.felix.dm.InvocationUtil;
import org.apache.felix.dm.ServiceDependency;
import org.apache.felix.dm.ServiceUtil;
-import org.apache.felix.dm.impl.BlockingSerialExecutor;
+import org.apache.felix.dm.impl.SerialExecutor;
import org.apache.felix.dm.impl.DefaultNullObject;
import org.apache.felix.dm.impl.Logger;
import org.apache.felix.dm.tracker.ServiceTracker;
@@ -89,7 +89,7 @@
/**
* Executor used to ensure proper synchronization without holding locks.
*/
- private final BlockingSerialExecutor m_serial = new BlockingSerialExecutor();
+ private final SerialExecutor m_serial;
// ----------------------- Inner classes --------------------------------------------------------------
@@ -232,11 +232,13 @@
super(logger);
m_context = context;
m_autoConfig = true;
+ m_serial = new SerialExecutor(logger);
}
/** Copying constructor that clones an existing instance. */
public ServiceDependencyImpl(ServiceDependencyImpl prototype) {
super(prototype);
+ m_serial = new SerialExecutor(m_logger);
synchronized (prototype) {
m_context = prototype.m_context;
m_autoConfig = prototype.m_autoConfig;