FELIX-4638 - Less locking on event handler timing. Apply patch from Bob Paulin
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1628132 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/eventadmin/impl/changelog.txt b/eventadmin/impl/changelog.txt
index 4a3a50a..b4eae29 100644
--- a/eventadmin/impl/changelog.txt
+++ b/eventadmin/impl/changelog.txt
@@ -2,6 +2,7 @@
---------------------------
** Improvement
* [FELIX-4623] - Add test for thread based ordering
+ * [FELIX-4638] - Less locking on event handler timing
Changes from 1.4.0 to 1.4.2
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
index 3f00e50..10adfc2 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
@@ -431,12 +431,15 @@
*/
public void blackListHandler()
{
- LogWrapper.getLogger().log(
- LogWrapper.LOG_WARNING,
- "Blacklisting ServiceReference [" + this.reference + " | Bundle("
- + this.reference.getBundle() + ")] due to timeout!");
- this.blacklisted = true;
- // we can free the handler now.
- this.release();
+ if(!this.blacklisted)
+ {
+ LogWrapper.getLogger().log(
+ LogWrapper.LOG_WARNING,
+ "Blacklisting ServiceReference [" + this.reference + " | Bundle("
+ + this.reference.getBundle() + ")] due to timeout!");
+ this.blacklisted = true;
+ // we can free the handler now.
+ this.release();
+ }
}
}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
new file mode 100644
index 0000000..85441db
--- /dev/null
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.eventadmin.impl.util.LogWrapper;
+
+/**
+ *
+ * A latch that checks handlers for blacklisting on an interval.
+ *
+ */
+public class BlacklistLatch {
+
+ private final Semaphore internalSemaphore;
+
+ private final int count;
+
+ private final long timeout;
+
+ private final List<HandlerTask> handlerTasks;
+
+ /**
+ * @param count Number of handlers that must call countdown
+ * @param timeout Timeout in Milliseconds to check for blacklisting handlers
+ */
+ public BlacklistLatch(final int count, final long timeout)
+ {
+ this.handlerTasks = new ArrayList<HandlerTask>(count);
+ this.count = count;
+ this.timeout = timeout;
+ internalSemaphore = new Semaphore(count);
+ internalSemaphore.drainPermits();
+ }
+
+ /**
+ *
+ * Count down the number of handlers blocking event completion.
+ *
+ */
+ public void countDown()
+ {
+ internalSemaphore.release();
+ }
+
+ /**
+ *
+ * Adds a handler task to the timeout based blackout checking.
+ *
+ * @param task
+ */
+ public void addToBlacklistCheck(final HandlerTask task)
+ {
+ this.handlerTasks.add(task);
+ }
+
+ /**
+ *
+ * Causes current thread to wait until each handler has called countDown.
+ * Checks on timeout interval to determine if a handler needs blacklisting.
+ *
+ */
+ public void awaitAndBlacklistCheck()
+ {
+ try
+ {
+ while(!internalSemaphore.tryAcquire(this.count, this.timeout, TimeUnit.MILLISECONDS))
+ {
+ final Iterator<HandlerTask> handlerTaskIt = handlerTasks.iterator();
+ while(handlerTaskIt.hasNext())
+ {
+ HandlerTask currentTask = handlerTaskIt.next();
+ currentTask.checkForBlacklist();
+ }
+ }
+ }
+ catch (final InterruptedException e)
+ {
+ LogWrapper.getLogger().log(
+ LogWrapper.LOG_WARNING,
+ "Event Task Processing Interrupted. Events may not be recieved in proper order.");
+ }
+ }
+
+
+}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
new file mode 100644
index 0000000..f51b747
--- /dev/null
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+
+import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
+import org.osgi.service.event.Event;
+
+/**
+ * A task that processes an event handler
+ *
+ */
+public class HandlerTask implements Runnable
+{
+ private final EventHandlerProxy task;
+
+ private final Event event;
+
+ private final long timeout;
+
+ private final BlacklistLatch handlerLatch;
+
+ private volatile long threadId;
+
+ private volatile long startTime;
+
+ private volatile long endTime;
+
+ /**
+ *
+ *
+ * @param task Proxy to the event handler
+ * @param event The event to send to the handler
+ * @param timeout Timeout for handler blacklisting
+ * @param handlerLatch The latch used to ensure events fire in proper order
+ */
+ public HandlerTask(final EventHandlerProxy task, final Event event, final long timeout, final BlacklistLatch handlerLatch)
+ {
+ this.task = task;
+ this.event = event;
+ this.timeout = timeout;
+ this.handlerLatch = handlerLatch;
+ this.threadId = -1l;
+ this.startTime = -1l;
+ this.endTime = -1l;
+ }
+
+ /**
+ *
+ * Perform timing based on thread CPU time with clock time fall back.
+ *
+ * @return
+ */
+ public long getTimeInMillis()
+ {
+ ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+ return bean.isThreadCpuTimeEnabled() ?
+ bean.getThreadCpuTime(threadId)/1000000 : System.currentTimeMillis();
+ }
+
+ /**
+ * Run Hander Event
+ */
+ @Override
+ public void run()
+ {
+ try
+ {
+ threadId = Thread.currentThread().getId();
+ startTime = getTimeInMillis();
+ // execute the task
+ task.sendEvent(event);
+ endTime = getTimeInMillis();
+ checkForBlacklist();
+ }
+ finally
+ {
+ handlerLatch.countDown();
+ }
+ }
+
+ public void runWithoutBlacklistTiming()
+ {
+ task.sendEvent(event);
+ handlerLatch.countDown();
+ }
+
+ /**
+ * This method defines if a timeout handling should be used for the
+ * task.
+ */
+ public boolean useTimeout()
+ {
+ // we only check the proxy if a timeout is configured
+ if ( this.timeout > 0)
+ {
+ return task.useTimeout();
+ }
+ return false;
+ }
+
+ /**
+ * Check to see if we need to blacklist this handler
+ *
+ */
+ public void checkForBlacklist()
+ {
+ if(useTimeout() && getTaskTime() > this.timeout)
+ {
+ task.blackListHandler();
+ }
+ }
+
+ /**
+ *
+ * Determine the amount of time spent running this task
+ *
+ * @return
+ */
+ public long getTaskTime()
+ {
+ if(threadId < 0l || startTime < 0l)
+ {
+ return 0l;
+ }
+ else if(endTime < 0l)
+ {
+ return getTimeInMillis() - startTime;
+ }
+ return endTime - startTime;
+
+ }
+
+}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java
deleted file mode 100644
index 176d5f9..0000000
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.felix.eventadmin.impl.tasks;
-
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-
-/**
- * This is a simplified version of the CyclicBarrier implementation.
- * It provides the same methods but internally ignores the exceptions.
- *
- * @author <a href="mailto:dev@felix.apache.org">Felix Project Team</a>
- */
-public class Rendezvous extends CyclicBarrier
-{
- /** Flag for timeout handling. */
- private volatile boolean timedout = false;
-
- /**
- * Create a Barrier for the indicated number of parties, and the default
- * Rotator function to run at each barrier point.
- */
- public Rendezvous()
- {
- super(2);
- }
-
- /**
- * see {@link CyclicBarrier#barrier()}
- */
- public void waitForRendezvous()
- {
- if ( timedout )
- {
- // if we have timed out, we return immediately
- return;
- }
- try
- {
- this.await();
- }
- catch (BrokenBarrierException ignore1)
- {
- }
- catch (InterruptedException ignore2)
- {
- }
- }
-
- /**
- * see {@link CyclicBarrier#attemptBarrier(long)}
- */
- public void waitAttemptForRendezvous(final long timeout)
- throws TimeoutException
- {
- try
- {
- this.await(timeout, TimeUnit.MILLISECONDS);
- this.reset();
- }
- catch (BrokenBarrierException ignore1)
- {
- }
- catch (TimeoutException te)
- {
- timedout = true;
- throw te;
- }
- catch (InterruptedException ignore2)
- {
- }
- }
-}
diff --git a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
index c3dbbdc..e0f54fd 100644
--- a/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
+++ b/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
@@ -20,7 +20,6 @@
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.TimeoutException;
import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
import org.osgi.service.event.Event;
@@ -80,21 +79,6 @@
}
/**
- * This method defines if a timeout handling should be used for the
- * task.
- * @param tasks The event handler dispatch tasks to execute
- */
- private boolean useTimeout(final EventHandlerProxy proxy)
- {
- // we only check the proxy if a timeout is configured
- if ( this.timeout > 0)
- {
- return proxy.useTimeout();
- }
- return false;
- }
-
- /**
* This blocks an unrelated thread used to send a synchronous event until the
* event is send (or a timeout occurs).
*
@@ -107,69 +91,34 @@
final SyncThread syncThread = sleepingThread instanceof SyncThread ? (SyncThread)sleepingThread : null;
final Iterator<EventHandlerProxy> i = tasks.iterator();
+ final BlacklistLatch handlerLatch = new BlacklistLatch(tasks.size(), this.timeout/2);
+
while ( i.hasNext() )
{
final EventHandlerProxy task = i.next();
+ HandlerTask handlerTask = new HandlerTask(task, event, this.timeout, handlerLatch);
// if ( !filterAsyncUnordered || task.isAsyncOrderedDelivery() )
// {
- if ( !useTimeout(task) )
+ if( !handlerTask.useTimeout() )
{
- // no timeout, we can directly execute
- task.sendEvent(event);
+ handlerTask.runWithoutBlacklistTiming();
}
- else if ( syncThread != null )
+ else if ( syncThread != null )
{
// if this is a cascaded event, we directly use this thread
// otherwise we could end up in a starvation
- final long startTime = System.currentTimeMillis();
- task.sendEvent(event);
- if ( System.currentTimeMillis() - startTime > this.timeout )
- {
- task.blackListHandler();
- }
+ handlerTask.run();
}
else
{
- final Rendezvous startBarrier = new Rendezvous();
- final Rendezvous timerBarrier = new Rendezvous();
- this.pool.executeTask(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- // notify the outer thread to start the timer
- startBarrier.waitForRendezvous();
- // execute the task
- task.sendEvent(event);
- // stop the timer
- timerBarrier.waitForRendezvous();
- }
- catch (final IllegalStateException ise)
- {
- // this can happen on shutdown, so we ignore it
- }
- }
- });
- // we wait for the inner thread to start
- startBarrier.waitForRendezvous();
-
- // timeout handling
- // we sleep for the sleep time
- // if someone wakes us up it's the finished inner task
- try
- {
- timerBarrier.waitAttemptForRendezvous(this.timeout);
- }
- catch (final TimeoutException ie)
- {
- // if we timed out, we have to blacklist the handler
- task.blackListHandler();
- }
-
+
+ handlerLatch.addToBlacklistCheck(handlerTask);
+ this.pool.executeTask(handlerTask);
}
+
// }
}
+ handlerLatch.awaitAndBlacklistCheck();
+
}
}