FELIX-3910: Back ported the new SerialExecutor from the upcoming dependency 4.0 version, and using it in the
ServiceDependencyImpl, as well as in the ComponentImpl class, especially for fixing the FELIX-4002 issue.


git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1607340 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/BlockingSerialExecutor.java b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/BlockingSerialExecutor.java
deleted file mode 100644
index efcea7a..0000000
--- a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/BlockingSerialExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.dm.impl;
-
-import java.util.LinkedList;
-
-/**
- * This class allows to serialize the execution of tasks on one single/unique thread. 
- * Other threads are blocked until they are elected for execution. 
- * 
- * <p>note I: when one leader thread executes a task, it does not hold any locks
- * while executing the task, and does not execute tasks scheduled by other threads.
- * 
- * <p>note II: this executor is reentrant: when one task executed by a leader thread
- * reschedule another task, then the task is run immediately.
- * 
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class BlockingSerialExecutor {
-    private final LinkedList m_tasksQueue = new LinkedList();
-    private Thread m_executingThread = null;
-
-    /**
-     * Executes a task exclusively without holding any locks (other concurrent tasks are blocked until the current task is executed).
-     * @param task a task to be executed serially, without holding any locks.
-     */
-    public void execute(Runnable task) {
-        boolean releaseLock = false;
-        synchronized (this) {
-            if (m_executingThread != Thread.currentThread()) {
-                m_tasksQueue.addLast(task);
-                while (m_tasksQueue.size() > 0 && m_tasksQueue.get(0) != task) {
-                    try {
-                        // TODO it might make sense to use a maxwait time and throw an exception on timeouts.
-                        wait();
-                    } catch (InterruptedException e) {
-                    }
-                }
-                m_executingThread = Thread.currentThread();
-                releaseLock = true;
-            }
-        }
-        try {
-            task.run();
-        } finally {
-            if (releaseLock) {
-                synchronized (this) {
-                    m_tasksQueue.remove(task);
-                    notifyAll();
-                    m_executingThread = null;
-                }
-            }
-        }
-    }
-}
diff --git a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java
index 70cf814..ea6fe3a 100644
--- a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java
+++ b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/SerialExecutor.java
@@ -18,73 +18,134 @@
  */
 package org.apache.felix.dm.impl;
 
-import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.osgi.service.log.LogService;
 
 /**
  * Allows you to enqueue tasks from multiple threads and then execute
  * them on one thread sequentially. It assumes more than one thread will
  * try to execute the tasks and it will make an effort to pick the first
  * task that comes along whilst making sure subsequent tasks return
- * without waiting.
+ * without waiting. <p>
+ * 
+ * This class is lock free and ensures "safe object publication" between scheduling threads and
+ * actual executing thread: if one thread T1 schedules a task, but another thread T2 actually 
+ * executes it, then all the objects from the T1 thread will be "safely published" to the executing T2 thread.
+ * Safe publication is ensured  because we are using a ConcurrentLinkedQueue.
+ * (see [1], chapter 3.5.3 (Safe publication idioms). 
+ * 
+ * [1] Java Concurrency In Practice, Addison Wesley
  * 
  * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
  */
-public final class SerialExecutor {
-	private static final Runnable DUMMY_RUNNABLE = new Runnable() { public void run() {}; };
-    private final LinkedList m_workQueue = new LinkedList();
-    private Runnable m_active;
-    
+public class SerialExecutor implements Executor {
+    /** 
+     * All tasks scheduled are stored there and only one thread may run them.
+     **/
+    protected final ConcurrentLinkedQueue m_tasks = new ConcurrentLinkedQueue();
+
+    /** 
+     * Thread currently executing the task queue. 
+     **/
+    protected final AtomicReference m_runningThread = new AtomicReference();
+
+    /** 
+     * Logger used when a task execution throws an exception 
+     **/
+    private final Logger m_logger;
+
     /**
-     * Enqueue a new task for later execution. This method is
-     * thread-safe, so multiple threads can contribute tasks.
-     * 
-     * @param runnable the runnable containing the actual task
+     * Makes a new SerialExecutor
+     * @param logger used when a task execution throws an exception. Can be null if no exception should be logger.
      */
-    public synchronized void enqueue(final Runnable runnable) {
-    	m_workQueue.addLast(new Runnable() {
-			public void run() {
-				try {
-					runnable.run();
-				}
-				finally {
-					scheduleNext();
-				}
-			}
-		});
-    }
-    
-    /**
-     * Execute any pending tasks. This method is thread safe,
-     * so multiple threads can try to execute the pending
-     * tasks, but only the first will be used to actually do
-     * so. Other threads will return immediately.
-     */
-    public void execute() {
-    	Runnable active;
-    	synchronized (this) {
-    		active = m_active;
-    		// for now just put some non-null value in there so we can never
-    		// get a race condition when two threads enter this section after
-    		// one another (causing sheduleNext() to be invoked twice below)
-    		m_active = DUMMY_RUNNABLE;
-    	}
-    	if (active == null) {
-    		scheduleNext();
-    	}
+    public SerialExecutor(Logger logger) {
+        m_logger = logger;
     }
 
-    private void scheduleNext() {
-    	Runnable active;
-    	synchronized (this) {
-			if (!m_workQueue.isEmpty()) {
-				m_active = (Runnable) m_workQueue.removeFirst();
-			} else {
-				m_active = null;
-			}
-    		active = m_active;
-    	}
-    	if (active != null) {
-            active.run();
+    /**
+     * Enqueues a task for later execution. You must call {@link #execute()} in order
+     * to trigger the task execution, which may or may not be executed by
+     * your current thread.
+     */
+    public void enqueue(Runnable task) {
+        m_tasks.add(task); // No need to synchronize, m_tasks is a concurrent linked queue.
+    }
+
+    /**
+     * Executes any pending tasks, enqueued using the {@link SerialExecutor#schedule(Runnable)} method. 
+     * This method is thread safe, so multiple threads can try to execute the pending
+     * tasks, but only the first will be used to actually do so. Other threads will return immediately.
+     */
+    public void execute() {
+        Thread currentThread = Thread.currentThread();
+        if (m_runningThread.compareAndSet(null, currentThread)) {
+            runTasks(currentThread);
         }
     }
+
+    /**
+     * Schedules a task for execution, and then attempts to execute it. This method is thread safe, so 
+     * multiple threads can try to execute a task but only the first will be executed, other threads will 
+     * return immediately, and the first thread will execute the tasks scheduled by the other threads.<p>
+     * <p>
+     * This method is reentrant: if the current thread is currently being executed by this executor, then 
+     * the task passed to this method will be executed immediately, from the current invoking thread
+     * (inline execution).
+     */
+    public void execute(Runnable task) {
+        Thread currentThread = Thread.currentThread();
+        if (m_runningThread.get() == currentThread) {
+            runTask(task);
+        } else {
+            enqueue(task);  
+            execute();
+        }
+    }
+    
+    /**
+     * Run all pending tasks
+     * @param currentRunninghread the current executing thread
+     */
+    private void runTasks(Thread currentRunninghread) {
+        do {
+            try {
+                Runnable task;
+                ConcurrentLinkedQueue tasks = m_tasks;
+
+                while ((task = (Runnable) tasks.poll()) != null) {
+                    runTask(task);
+                }
+            }
+            finally {
+                m_runningThread.set(null);
+            }
+        }
+        // We must test again if some tasks have been scheduled after our "while" loop above, but before the
+        // m_runningThread reference has been reset to null.
+        while (!m_tasks.isEmpty() && m_runningThread.compareAndSet(null, currentRunninghread));
+    }
+
+    /**
+     * Run a given task.
+     * @param task the task to execute.
+     */
+    void runTask(Runnable command) {
+        try {
+            command.run();
+        }
+        catch (Throwable t) {
+            if (m_logger != null) {
+                m_logger.log(LogService.LOG_ERROR, "Error processing tasks", t);
+            } else {
+                t.printStackTrace();
+            }
+        }
+    }
+
+    public String toString() {
+        return "[Executor: queue size: " + m_tasks.size() + "]";
+    }
 }
diff --git a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java
index 5ec6b37..d74d938 100644
--- a/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java
+++ b/dependencymanager/core/src/main/java/org/apache/felix/dm/impl/dependencies/ServiceDependencyImpl.java
@@ -41,7 +41,7 @@
 import org.apache.felix.dm.InvocationUtil;
 import org.apache.felix.dm.ServiceDependency;
 import org.apache.felix.dm.ServiceUtil;
-import org.apache.felix.dm.impl.BlockingSerialExecutor;
+import org.apache.felix.dm.impl.SerialExecutor;
 import org.apache.felix.dm.impl.DefaultNullObject;
 import org.apache.felix.dm.impl.Logger;
 import org.apache.felix.dm.tracker.ServiceTracker;
@@ -89,7 +89,7 @@
     /**
      * Executor used to ensure proper synchronization without holding locks. 
      */
-    private final BlockingSerialExecutor m_serial = new BlockingSerialExecutor();
+    private final SerialExecutor m_serial;
 
     // ----------------------- Inner classes --------------------------------------------------------------
 
@@ -232,11 +232,13 @@
         super(logger);
         m_context = context;
         m_autoConfig = true;
+        m_serial = new SerialExecutor(logger);
     }
 
     /** Copying constructor that clones an existing instance. */
     public ServiceDependencyImpl(ServiceDependencyImpl prototype) {
         super(prototype);
+        m_serial = new SerialExecutor(m_logger);
         synchronized (prototype) {
             m_context = prototype.m_context;
             m_autoConfig = prototype.m_autoConfig;