FELIX-4638 - Less locking on event handler timing. Apply patch from Bob Paulin

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1628132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/eventadmin/impl/changelog.txt b/eventadmin/impl/changelog.txt
index 4a3a50a..b4eae29 100644
--- a/eventadmin/impl/changelog.txt
+++ b/eventadmin/impl/changelog.txt
@@ -2,6 +2,7 @@
 ---------------------------
 ** Improvement
     * [FELIX-4623] - Add test for thread based ordering
+    * [FELIX-4638] - Less locking on event handler timing
 
 
 Changes from 1.4.0 to 1.4.2
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
index 3f00e50..10adfc2 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
@@ -431,12 +431,15 @@
      */
     public void blackListHandler()
     {
-        LogWrapper.getLogger().log(
-                        LogWrapper.LOG_WARNING,
-                        "Blacklisting ServiceReference [" + this.reference + " | Bundle("
-                                        + this.reference.getBundle() + ")] due to timeout!");
-        this.blacklisted = true;
-        // we can free the handler now.
-        this.release();
+    	if(!this.blacklisted)
+    	{
+	        LogWrapper.getLogger().log(
+	                        LogWrapper.LOG_WARNING,
+	                        "Blacklisting ServiceReference [" + this.reference + " | Bundle("
+	                                        + this.reference.getBundle() + ")] due to timeout!");
+	        this.blacklisted = true;
+	        // we can free the handler now.
+	        this.release();
+    	}
     }
 }
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
new file mode 100644
index 0000000..85441db
--- /dev/null
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.eventadmin.impl.util.LogWrapper;
+
+/**
+ *
+ * A latch that checks handlers for blacklisting on an interval.
+ *
+ */
+public class BlacklistLatch {
+
+	private final Semaphore internalSemaphore;
+
+	private final int count;
+
+	private final long timeout;
+
+	private final List<HandlerTask> handlerTasks;
+
+	/**
+	 * @param count Number of handlers that must call countdown
+	 * @param timeout Timeout in Milliseconds to check for blacklisting handlers
+	 */
+	public BlacklistLatch(final int count, final long timeout)
+	{
+		this.handlerTasks = new ArrayList<HandlerTask>(count);
+		this.count = count;
+		this.timeout = timeout;
+		internalSemaphore = new Semaphore(count);
+		internalSemaphore.drainPermits();
+	}
+
+	/**
+	 *
+	 * Count down the number of handlers blocking event completion.
+	 *
+	 */
+	public void countDown()
+	{
+		internalSemaphore.release();
+	}
+
+	/**
+	 *
+	 * Adds a handler task to the timeout based blackout checking.
+	 *
+	 * @param task
+	 */
+	public void addToBlacklistCheck(final HandlerTask task)
+	{
+		this.handlerTasks.add(task);
+	}
+
+	/**
+	 *
+	 * Causes current thread to wait until each handler has called countDown.
+	 * Checks on timeout interval to determine if a handler needs blacklisting.
+	 *
+	 */
+	public void awaitAndBlacklistCheck()
+	{
+		try
+        {
+        	while(!internalSemaphore.tryAcquire(this.count, this.timeout, TimeUnit.MILLISECONDS))
+            {
+            	final Iterator<HandlerTask> handlerTaskIt = handlerTasks.iterator();
+            	while(handlerTaskIt.hasNext())
+            	{
+            		HandlerTask currentTask = handlerTaskIt.next();
+            		currentTask.checkForBlacklist();
+            	}
+            }
+        }
+        catch (final InterruptedException e)
+        {
+        	LogWrapper.getLogger().log(
+                    LogWrapper.LOG_WARNING,
+                    "Event Task Processing Interrupted. Events may not be recieved in proper order.");
+        }
+	}
+
+
+}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
new file mode 100644
index 0000000..f51b747
--- /dev/null
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+
+import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
+import org.osgi.service.event.Event;
+
+/**
+ * A task that processes an event handler
+ *
+ */
+public class HandlerTask implements Runnable
+{
+	private final EventHandlerProxy task;
+
+	private final Event event;
+
+	private final long timeout;
+
+	private final BlacklistLatch handlerLatch;
+
+	private volatile long threadId;
+
+	private volatile long startTime;
+
+	private volatile long endTime;
+
+	/**
+	 *
+	 *
+	 * @param task Proxy to the event handler
+	 * @param event The event to send to the handler
+	 * @param timeout Timeout for handler blacklisting
+	 * @param handlerLatch The latch used to ensure events fire in proper order
+	 */
+	public HandlerTask(final EventHandlerProxy task, final Event event, final long timeout, final BlacklistLatch handlerLatch)
+	{
+		this.task = task;
+		this.event = event;
+		this.timeout = timeout;
+		this.handlerLatch = handlerLatch;
+		this.threadId = -1l;
+		this.startTime = -1l;
+		this.endTime = -1l;
+	}
+
+	/**
+	 *
+	 * Perform timing based on thread CPU time with clock time fall back.
+	 *
+	 * @return
+	 */
+	public long getTimeInMillis()
+    {
+        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+        return bean.isThreadCpuTimeEnabled() ?
+            bean.getThreadCpuTime(threadId)/1000000 : System.currentTimeMillis();
+    }
+
+	/**
+	 * Run Hander Event
+	 */
+    @Override
+    public void run()
+    {
+        try
+        {
+        	threadId = Thread.currentThread().getId();
+            startTime = getTimeInMillis();
+            // execute the task
+            task.sendEvent(event);
+            endTime = getTimeInMillis();
+            checkForBlacklist();
+        }
+        finally
+        {
+        	handlerLatch.countDown();
+        }
+    }
+
+    public void runWithoutBlacklistTiming()
+    {
+    	task.sendEvent(event);
+    	handlerLatch.countDown();
+    }
+
+    /**
+     * This method defines if a timeout handling should be used for the
+     * task.
+     */
+    public boolean useTimeout()
+    {
+        // we only check the proxy if a timeout is configured
+        if ( this.timeout > 0)
+        {
+            return task.useTimeout();
+        }
+        return false;
+    }
+
+    /**
+     * Check to see if we need to blacklist this handler
+     *
+     */
+    public void checkForBlacklist()
+    {
+    	if(useTimeout() && getTaskTime() > this.timeout)
+		{
+			task.blackListHandler();
+		}
+    }
+
+    /**
+     *
+     * Determine the amount of time spent running this task
+     *
+     * @return
+     */
+    public long getTaskTime()
+    {
+    	if(threadId < 0l || startTime < 0l)
+    	{
+    		return 0l;
+    	}
+    	else if(endTime < 0l)
+    	{
+    		return getTimeInMillis() - startTime;
+    	}
+    	return endTime - startTime;
+
+    }
+
+}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java
deleted file mode 100644
index 176d5f9..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.tasks;
-
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-
-/**
- * This is a simplified version of the CyclicBarrier implementation.
- * It provides the same methods but internally ignores the exceptions.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class Rendezvous extends CyclicBarrier
-{
-    /** Flag for timeout handling. */
-    private volatile boolean timedout = false;
-
-    /**
-     * Create a Barrier for the indicated number of parties, and the default
-     * Rotator function to run at each barrier point.
-     */
-    public Rendezvous()
-    {
-        super(2);
-    }
-
-    /**
-     * see {@link CyclicBarrier#barrier()}
-     */
-    public void waitForRendezvous()
-    {
-        if ( timedout )
-        {
-            // if we have timed out, we return immediately
-            return;
-        }
-        try
-        {
-            this.await();
-        }
-        catch (BrokenBarrierException ignore1)
-        {
-        }
-        catch (InterruptedException ignore2)
-        {
-        }
-    }
-
-    /**
-     * see {@link CyclicBarrier#attemptBarrier(long)}
-     */
-    public void waitAttemptForRendezvous(final long timeout)
-    throws TimeoutException
-    {
-        try
-        {
-            this.await(timeout, TimeUnit.MILLISECONDS);
-            this.reset();
-        }
-        catch (BrokenBarrierException ignore1)
-        {
-        }
-        catch (TimeoutException te)
-        {
-            timedout = true;
-            throw te;
-        }
-        catch (InterruptedException ignore2)
-        {
-        }
-    }
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
index c3dbbdc..e0f54fd 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
@@ -20,7 +20,6 @@
 
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
 import org.osgi.service.event.Event;
@@ -80,21 +79,6 @@
     }
 
     /**
-     * This method defines if a timeout handling should be used for the
-     * task.
-     * @param tasks The event handler dispatch tasks to execute
-     */
-    private boolean useTimeout(final EventHandlerProxy proxy)
-    {
-        // we only check the proxy if a timeout is configured
-        if ( this.timeout > 0)
-        {
-            return proxy.useTimeout();
-        }
-        return false;
-    }
-
-    /**
      * This blocks an unrelated thread used to send a synchronous event until the
      * event is send (or a timeout occurs).
      *
@@ -107,69 +91,34 @@
         final SyncThread syncThread = sleepingThread instanceof SyncThread ? (SyncThread)sleepingThread : null;
 
         final Iterator<EventHandlerProxy> i = tasks.iterator();
+        final BlacklistLatch handlerLatch = new BlacklistLatch(tasks.size(), this.timeout/2);
+        
         while ( i.hasNext() )
         {
             final EventHandlerProxy task = i.next();
+            HandlerTask handlerTask = new HandlerTask(task, event, this.timeout, handlerLatch);
 //            if ( !filterAsyncUnordered || task.isAsyncOrderedDelivery() )
 //            {
-                if ( !useTimeout(task) )
+                if( !handlerTask.useTimeout() )
                 {
-                    // no timeout, we can directly execute
-                    task.sendEvent(event);
+                	handlerTask.runWithoutBlacklistTiming();
                 }
-                else if ( syncThread != null )
+            	else if ( syncThread != null  )
                 {
                     // if this is a cascaded event, we directly use this thread
                     // otherwise we could end up in a starvation
-                    final long startTime = System.currentTimeMillis();
-                    task.sendEvent(event);
-                    if ( System.currentTimeMillis() - startTime > this.timeout )
-                    {
-                        task.blackListHandler();
-                    }
+                    handlerTask.run();
                 }
                 else
                 {
-                    final Rendezvous startBarrier = new Rendezvous();
-                    final Rendezvous timerBarrier = new Rendezvous();
-                    this.pool.executeTask(new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            try
-                            {
-                                // notify the outer thread to start the timer
-                                startBarrier.waitForRendezvous();
-                                // execute the task
-                                task.sendEvent(event);
-                                // stop the timer
-                                timerBarrier.waitForRendezvous();
-                            }
-                            catch (final IllegalStateException ise)
-                            {
-                                // this can happen on shutdown, so we ignore it
-                            }
-                        }
-                    });
-                    // we wait for the inner thread to start
-                    startBarrier.waitForRendezvous();
-
-                    // timeout handling
-                    // we sleep for the sleep time
-                    // if someone wakes us up it's the finished inner task
-                    try
-                    {
-                        timerBarrier.waitAttemptForRendezvous(this.timeout);
-                    }
-                    catch (final TimeoutException ie)
-                    {
-                        // if we timed out, we have to blacklist the handler
-                        task.blackListHandler();
-                    }
-
+                	
+                	handlerLatch.addToBlacklistCheck(handlerTask);
+                    this.pool.executeTask(handlerTask);
                 }
+               
 //            }
         }
+        handlerLatch.awaitAndBlacklistCheck();
+        
     }
 }